例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断地从指定的数据源读取数据,将数据导入到Doris中。本文主要介绍Routine Load功能的实现原理、使用方式以及最佳实践。
使用限制
当前仅支持从Kafka进行例行导入。
- 支持无认证的Kafka访问,以及通过SSL方式认证的Kafka集群。 
- 支持的消息格式为CSV或JSON文本格式。CSV每一个message为一行,且行尾不包含换行符。 
- 默认支持Kafka 0.10.0.0及以上版本。如果要使用Kafka 0.10.0.0以下版本 (0.9.0、0.8.2、0.8.1、0.8.0),需要修改BE配置,将 kafka_broker_version_fallback参数值设置为要兼容的旧版本,或者在创建Routine Load时直接设置property.broker.version.fallback参数值为要兼容的旧版本。 说明- 使用旧版本可能会导致Routine Load的部分新特性无法使用,例如根据时间设置Kafka分区的offset。 
基本原理
Client向FE提交一个Routine Load作业的原理如下:
+---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+- FE通过JobScheduler将一个导入作业拆分成若干个Task。每个Task负责导入指定的一部分数据。Task被TaskScheduler分配到指定的BE 上执行。 
- 在BE上,一个Task被视为一个普通的导入任务,通过Stream Load的导入机制进行导入。导入完成后,向FE汇报。 
- FE中的JobScheduler根据汇报结果,继续生成后续新的Task,或者对失败的Task进行重试。 
- 整个Routine Load作业通过不断地产生新的Task,来完成数据不间断的导入。 
Kafka例行导入
下面详细介绍Kafka例行导入的使用方式和最佳实践。
创建任务
创建例行导入任务的详细语法可以查看CREATE ROUTINE LOAD命令手册或执行HELP ROUTINE LOAD; 查看语法帮助。下面以几个示例说明如何创建Routine Load任务。
- 为example_db的example_tbl创建一个名为test1的Kafka例行导入任务。指定列分隔符和group.id和client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅。 - CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
- 以严格模式为example_db的example_tbl创建一个名为test1的Kafka例行导入任务。 - CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), WHERE k1 > 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
- 导入JSON格式数据使用示例。 - Routine Load导入的JSON格式仅支持以下两种: - 第一种为只有一条记录,且为JSON对象。 - {"category":"a9jadhx","author":"test","price":895}- 第二种为JSON数组,数组中可含多条记录。 - [ { "category":"11", "author":"4avc", "price":895, "timestamp":1589191587 }, { "category":"22", "author":"2avc", "price":895, "timestamp":1589191487 }, { "category":"33", "author":"3avc", "price":342, "timestamp":1589191387 } ]- 创建待导入的Doris数据表,示例如下。 - CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20200509")), PARTITION p20200509 VALUES [("20200509"), ("20200510")), PARTITION p20200510 VALUES [("20200510"), ("20200511")), PARTITION p20200511 VALUES [("20200511"), ("20200512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4 PROPERTIES ( "replication_num" = "1" );- 以简单模式导入JSON数据,示例如下。 - CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1 COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );- 精准导入JSON格式数据,示例如下。 - CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );说明- 表中的分区字段dt在示例的数据里并没有,而是在Routine load语句里通过 - dt=from_unixtime(timestamp, '%Y%m%d')转换得来的。- strict mode与source data的导入关系 - 列类型为TinyInt,且表中的列允许导入空值时: - source data - source data example - string to int - strict_mode - result - 空值 - \N - N/A - true or false - NULL - not null - aaa or 2000 - NULL - true - invalid data(filtered) - not null - aaa - NULL - true - NULL - not null - 1 - 1 - true or false - correct data 
- 列类型为 Decimal(1,0),且表中的列允许导入空值时: - source data - source data example - string to int - strict_mode - result - 空值 - \N - N/A - true or false - NULL - not null - aaa - NULL - true - invalid data(filtered) - not null - aaa - NULL - false - NULL - not null - 1 or 10 - 1 - true or false - correct data 说明- 10虽然是一个超过范围的值,但是因为其类型符合decimal的要求,所以strict mode对其不产生影响。10最后会在其他ETL处理流程中被过滤,但不会被strict mode过滤。 
 
- 访问SSL认证的Kafka集群 - 访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(ca.pem)。如果Kafka集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key)、以及密钥密码。文件需要先通过CREATE FILE命令上传到Doris中,并且catalog名称为kafka。CREATE FILE命令详情可使用 - HELP CREATE FILE;查看。示例如下。- 上传文件。 - CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
- 创建例行导入作业。 - CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcd***" );
 - Doris通过Kafka的C++ API librdkafka来访问Kafka集群,librdkafka所支持的参数详情请参见librdkafka。 
 
查看作业状态
查看作业状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD;命令查看。
查看任务运行状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD TASK; 命令查看。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
修改作业属性
您可以使用ALTER ROUTINE LOAD来修改已经创建的作业,或者通过HELP ALTER ROUTINE LOAD; 命令查看关于此命令的帮助文档。
作业控制
您可以通过STOP、PAUSE或RESUME三个命令来控制作业的停止、暂停和重启。可以通过HELP STOP ROUTINE LOAD; 、HELP PAUSE ROUTINE LOAD;以及 HELP RESUME ROUTINE LOAD; 三个命令查看帮助和示例。
其他说明
- 例行导入作业和ALTER TABLE操作的关系 - 例行导入不会阻塞SCHEMA CHANGE和ROLLUP操作。 - 但需注意的是,如果SCHEMA CHANGE完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加Nullable列或带Default值的列来减少这类问题。 
- 删除表的Partition可能会导致导入数据无法找到对应的Partition,使得作业暂停。 
 
- 例行导入作业和其他导入作业的关系(LOAD、DELETE、INSERT) - 例行导入和其他LOAD作业以及INSERT操作没有冲突。 
- 当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务。所以在执行DELETE操作前,需要先暂停例行导入作业,并等待已下发的task全部完成后,才可以执行DELETE。 
 
- 例行导入作业和DROP DATABASE/TABLE操作的关系:当例行导入对应的DATABASE或TABLE被删除后,作业会自动CANCEL。 
- Kafka类型的例行导入作业和Kafka topic的关系 - 当您在创建例行导入声明的kafka_topic在Kafka集群中不存在时: - 如果您的Kafka集群的broker设置了auto.create.topics.enable = true,则kafka_topic会先被自动创建,自动创建的partition个数是由您的Kafka集群中的broker配置num.partitions决定的。例行作业会正常的不断读取该topic的数据。 
- 如果您的Kafka集群的broker设置了auto.create.topics.enable = false, 则kafka_topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为PAUSED。 
 - 因此,如果您希望当Kafka topic不存在的时候,被例行作业可以自动创建,只需要将您的kafka集群中的broker设置auto.create.topics.enable = true即可。 
- 在网络隔离的环境中可能出现的问题 - 在有些环境中存在网段和域名解析的隔离措施,所以需要注意: - 创建Routine load任务中指定的Broker list必须能够被Doris服务访问。 
- Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必须能够被Doris服务访问。 
 
- 关于指定消费的Partition和Offset - Doris支持指定Partition和Offset开始消费,新版中还支持了指定时间点进行消费的功能。有三个相关参数: - kafka_partitions:指定待消费的partition列表,如:"0, 1, 2, 3"。 
- kafka_offsets:指定每个分区的起始offset,必须和kafka_partitions列表个数对应,如:"1000, 1000, 2000, 2000"。 
- property.kafka_default_offset:指定分区默认的起始offset。 
 - 在创建导入作业时,这三个参数可以有以下组合: - 组合 - kafka_partitions - kafka_offsets - property.kafka_default_offset - 行为 - 1 - No - No - No - 系统会自动查找topic对应的所有分区并从 OFFSET_END开始消费。 - 2 - No - No - Yes - 系统会自动查找topic对应的所有分区并从default offset指定的位置开始消费。 - 3 - Yes - No - No - 系统会从指定分区的OFFSET_END开始消费。 - 4 - Yes - Yes - No - 系统会从指定分区的指定offset处开始消费。 - 5 - Yes - No - Yes - 系统会从指定分区,default offset指定的位置开始消费。 
- STOP和PAUSE的区别 - FE会自动定期清理STOP状态的ROUTINE LOAD,而PAUSE状态的则可以再次被恢复启用。 
相关参数
一些系统配置参数会影响例行导入的使用,具体说明如下。
| 配置项 | FE/BE | 默认值 | 说明 | 
| max_routine_load_task_concurrent_num | FE | 5 | 该参数限制了一个例行导入作业最大的子任务并发数。可以运行时修改,建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。 | 
| max_routine_load_task_num_per_be | FE | 5 | 该参数限制了每个BE节点最多并发执行的子任务个数。可以运行时修改,建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。 | 
| max_routine_load_job_num | FE | 100 | 该参数限制了例行导入作业的总数,包括 NEED_SCHEDULED、RUNNING、PAUSE这些状态。可以运行时修改。超过后,不能再提交新的作业。 | 
| max_consumer_num_per_group | BE | 3 | 该参数表示一个子任务中最多生成几个consumer进行数据消费。对于Kafka数据源,一个consumer可能消费一个或多个Kafka partition。假设一个任务需要消费6个Kafka partition,则会生成3个 consumer,每个consumer消费2个partition。如果只有2个partition,则只会生成2个consumer,每个 consumer消费1个partition。 | 
| push_write_mbytes_per_sec | BE | 10,即10MB/s | 该参数是导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于SSD等高性能存储设备,可以适当增加这个限速。 | 
| max_tolerable_backend_down_num | FE | 0 | 在满足某些条件下,Doris可PAUSED的任务重新调度,即变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。 | 
| period_of_auto_resume_min | FE | 5分钟 | Doris重新调度,只会在5分钟这个周期内,最多尝试3次。如果3次都失败则锁定当前任务,后续不再进行调度。但可通过人为干预,进行手动恢复。 |