文档

Routine Load

更新时间:

例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到Doris中。本文主要介绍Routine Load功能的实现原理、使用方式以及最佳实践。

使用限制

当前仅支持从Kafka进行例行导入。

  • 支持无认证的Kafka访问,以及通过SSL方式认证的Kafka集群。

  • 支持的消息格式为CSV或JSON文本格式。CSV每一个message为一行,且行尾不包含换行符。

  • 默认支持Kafka 0.10.0.0及以上版本。如果要使用Kafka 0.10.0.0以下版本 (0.9.0、0.8.2、0.8.1、0.8.0),需要修改BE配置,将 kafka_broker_version_fallback参数值设置为要兼容的旧版本,或者在创建Routine Load时直接设置property.broker.version.fallback参数值为要兼容的旧版本。

    说明

    使用旧版本可能会导致Routine Load的部分新特性无法使用,例如根据时间设置Kafka分区的offset。

基本原理

Client向FE提交一个Routine Load作业的原理如下:

+---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+
  1. FE通过JobScheduler将一个导入作业拆分成若干个Task。每个Task负责导入指定的一部分数据。Task被TaskScheduler分配到指定的BE 上执行。

  2. 在BE上,一个Task被视为一个普通的导入任务,通过Stream Load的导入机制进行导入。导入完成后,向FE汇报。

  3. FE中的JobScheduler根据汇报结果,继续生成后续新的Task,或者对失败的Task进行重试。

  4. 整个Routine Load作业通过不断地产生新的Task,来完成数据不间断的导入。

Kafka例行导入

下面详细介绍Kafka例行导入的使用方式和最佳实践。

创建任务

创建例行导入任务的详细语法可以查看CREATE ROUTINE LOAD命令手册或执行HELP ROUTINE LOAD; 查看语法帮助。下面以几个示例说明如何创建Routine Load任务。

  • 为example_db的example_tbl创建一个名为test1的Kafka例行导入任务。指定列分隔符和group.id和client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS TERMINATED BY ",",
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.group.id" = "xxx",
                "property.client.id" = "xxx",
                "property.kafka_default_offsets" = "OFFSET_BEGINNING"
            );
  • 严格模式为example_db的example_tbl创建一个名为test1的Kafka例行导入任务。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
            WHERE k1 > 100 and k2 like "%doris%"
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "true"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );
  • 导入JSON格式数据使用示例。

    Routine Load导入的JSON格式仅支持以下两种:

    第一种为只有一条记录,且为JSON对象。

    {"category":"a9jadhx","author":"test","price":895}

    第二种为JSON数组,数组中可含多条记录。

    [
        {
            "category":"11",
            "author":"4avc",
            "price":895,
            "timestamp":1589191587
        },
        {
            "category":"22",
            "author":"2avc",
            "price":895,
            "timestamp":1589191487
        },
        {
            "category":"33",
            "author":"3avc",
            "price":342,
            "timestamp":1589191387
        }
    ]

    创建待导入的Doris数据表,示例如下。

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20200509")),
        PARTITION p20200509 VALUES [("20200509"), ("20200510")),
        PARTITION p20200510 VALUES [("20200510"), ("20200511")),
        PARTITION p20200511 VALUES [("20200511"), ("20200512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
    PROPERTIES (
        "replication_num" = "1"
    );

    以简单模式导入JSON数据,示例如下。

    CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    COLUMNS(category,price,author)
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
     );

    精准导入JSON格式数据,示例如下。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json",
        "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
        "strip_outer_array" = "true"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
    );
    说明

    表中的分区字段dt在示例的数据里并没有,而是在Routine load语句里通过dt=from_unixtime(timestamp, '%Y%m%d')转换得来的。

    • strict mode与source data的导入关系

      • 列类型为TinyInt,且表中的列允许导入空值时:

        source data

        source data example

        string to int

        strict_mode

        result

        空值

        \N

        N/A

        true or false

        NULL

        not null

        aaa or 2000

        NULL

        true

        invalid data(filtered)

        not null

        aaa

        NULL

        true

        NULL

        not null

        1

        1

        true or false

        correct data

      • 列类型为 Decimal(1,0),且表中的列允许导入空值时:

        source data

        source data example

        string to int

        strict_mode

        result

        空值

        \N

        N/A

        true or false

        NULL

        not null

        aaa

        NULL

        true

        invalid data(filtered)

        not null

        aaa

        NULL

        false

        NULL

        not null

        1 or 10

        1

        true or false

        correct data

        说明

        10虽然是一个超过范围的值,但是因为其类型符合decimal的要求,所以strict mode对其不产生影响。10最后会在其他ETL处理流程中被过滤,但不会被strict mode过滤。

    • 访问SSL认证的Kafka集群

      访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(ca.pem)。如果Kafka集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key)、以及密钥密码。文件需要先通过CREATE FILE命令上传到Doris中,并且catalog名称为kafka。CREATE FILE命令详情可使用HELP CREATE FILE;查看。示例如下。

      1. 上传文件。

        CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
        CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
        CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
      2. 创建例行导入作业。

        CREATE ROUTINE LOAD db1.job1 on tbl1
        PROPERTIES
        (
            "desired_concurrent_number"="1"
        )
        FROM KAFKA
        (
            "kafka_broker_list"= "broker1:9091,broker2:9091",
            "kafka_topic" = "my_topic",
            "property.security.protocol" = "ssl",
            "property.ssl.ca.location" = "FILE:ca.pem",
            "property.ssl.certificate.location" = "FILE:client.pem",
            "property.ssl.key.location" = "FILE:client.key",
            "property.ssl.key.password" = "abcd***"
        );

      Doris通过Kafka的C++ API librdkafka来访问Kafka集群,librdkafka所支持的参数详情请参见librdkafka

查看作业状态

查看作业状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD;命令查看。

查看任务运行状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD TASK; 命令查看。

说明

只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

您可以修改已经创建的作业。详情请参见ALTER ROUTINE LOAD或通过HELP ALTER ROUTINE LOAD; 命令查看。

作业控制

您可以通过STOP、PAUSE或RESUME三个命令来控制作业的停止、暂停和重启。可以通过HELP STOP ROUTINE LOAD; HELP PAUSE ROUTINE LOAD;以及 HELP RESUME ROUTINE LOAD; 三个命令查看帮助和示例。

其他说明

  • 例行导入作业和ALTER TABLE操作的关系

    • 例行导入不会阻塞SCHEMA CHANGE和ROLLUP操作。

      但需注意的是,如果SCHEMA CHANGE完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加Nullable列或带Default值的列来减少这类问题。

    • 删除表的Partition可能会导致导入数据无法找到对应的Partition,使得作业暂停。

  • 例行导入作业和其他导入作业的关系(LOAD、DELETE、INSERT)

    • 例行导入和其他LOAD作业以及INSERT操作没有冲突。

    • 当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务。所以在执行DELETE操作前,需要先暂停例行导入作业,并等待已下发的task全部完成后,才可以执行DELETE。

  • 例行导入作业和DROP DATABASE/TABLE操作的关系:当例行导入对应的DATABASE或TABLE被删除后,作业会自动CANCEL。

  • Kafka类型的例行导入作业和Kafka topic的关系

    当您在创建例行导入声明的kafka_topic在Kafka集群中不存在时:

    • 如果您的Kafka集群的broker设置了auto.create.topics.enable = true,则kafka_topic会先被自动创建,自动创建的partition个数是由您的Kafka集群中的broker配置num.partitions决定的。例行作业会正常的不断读取该topic的数据。

    • 如果您的Kafka集群的broker设置了auto.create.topics.enable = false, 则kafka_topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为PAUSED。

    因此,如果您希望当Kafka topic不存在的时候,被例行作业可以自动创建,只需要将您的kafka集群中的broker设置auto.create.topics.enable = true即可。

  • 在网络隔离的环境中可能出现的问题

    在有些环境中存在网段和域名解析的隔离措施,所以需要注意:

    • 创建Routine load任务中指定的Broker list必须能够被Doris服务访问。

    • Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必须能够被Doris服务访问。

  • 关于指定消费的Partition和Offset

    Doris支持指定Partition和Offset开始消费,新版中还支持了指定时间点进行消费的功能。有三个相关参数:

    • kafka_partitions:指定待消费的partition列表,如:"0, 1, 2, 3"。

    • kafka_offsets:指定每个分区的起始offset,必须和kafka_partitions列表个数对应,如:"1000, 1000, 2000, 2000"。

    • property.kafka_default_offset:指定分区默认的起始offset。

    在创建导入作业时,这三个参数可以有以下组合:

    组合

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    行为

    1

    No

    No

    No

    系统会自动查找topic对应的所有分区并从 OFFSET_END开始消费。

    2

    No

    No

    Yes

    系统会自动查找topic对应的所有分区并从default offset指定的位置开始消费。

    3

    Yes

    No

    No

    系统会从指定分区的OFFSET_END开始消费。

    4

    Yes

    Yes

    No

    系统会从指定分区的指定offset处开始消费。

    5

    Yes

    No

    Yes

    系统会从指定分区,default offset指定的位置开始消费。

  • STOP和PAUSE的区别

    FE会自动定期清理STOP状态的ROUTINE LOAD,而PAUSE状态的则可以再次被恢复启用。

相关参数

一些系统配置参数会影响例行导入的使用,具体说明如下。

配置项

FE/BE

默认值

说明

max_routine_load_task_concurrent_num

FE

5

该参数限制了一个例行导入作业最大的子任务并发数。可以运行时修改,建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。

max_routine_load_task_num_per_be

FE

5

该参数限制了每个BE节点最多并发执行的子任务个数。可以运行时修改,建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。

max_routine_load_job_num

FE

100

该参数限制了例行导入作业的总数,包括 NEED_SCHEDULED、RUNNING、PAUSE这些状态。可以运行时修改。超过后,不能再提交新的作业。

max_consumer_num_per_group

BE

3

该参数表示一个子任务中最多生成几个consumer进行数据消费。对于Kafka数据源,一个consumer可能消费一个或多个Kafka partition。假设一个任务需要消费6个Kafka partition,则会生成3个 consumer,每个consumer消费2个partition。如果只有2个partition,则只会生成2个consumer,每个 consumer消费1个partition。

push_write_mbytes_per_sec

BE

10,即10MB/s

该参数是导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于SSD等高性能存储设备,可以适当增加这个限速。

max_tolerable_backend_down_num

FE

0

在满足某些条件下,Doris可PAUSED的任务重新调度,即变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。

period_of_auto_resume_min

FE

5分钟

Doris重新调度,只会在5分钟这个周期内,最多尝试3次。如果3次都失败则锁定当前任务,后续不再进行调度。但可通过人为干预,进行手动恢复。