例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断地从指定的数据源读取数据,将数据导入到Doris中。本文主要介绍Routine Load功能的实现原理、使用方式以及最佳实践。
使用限制
当前仅支持从Kafka进行例行导入。
支持无认证的Kafka访问,以及通过SSL方式认证的Kafka集群。
支持的消息格式为CSV或JSON文本格式。CSV每一个message为一行,且行尾不包含换行符。
默认支持Kafka 0.10.0.0及以上版本。如果要使用Kafka 0.10.0.0以下版本 (0.9.0、0.8.2、0.8.1、0.8.0),需要修改BE配置,将 kafka_broker_version_fallback参数值设置为要兼容的旧版本,或者在创建Routine Load时直接设置property.broker.version.fallback参数值为要兼容的旧版本。
说明使用旧版本可能会导致Routine Load的部分新特性无法使用,例如根据时间设置Kafka分区的offset。
基本原理
Client向FE提交一个Routine Load作业的原理如下:
+---------+
| Client |
+----+----+
|
+-----------------------------+
| FE | |
| +-----------v------------+ |
| | | |
| | Routine Load Job | |
| | | |
| +---+--------+--------+--+ |
| | | | |
| +---v--+ +---v--+ +---v--+ |
| | task | | task | | task | |
| +--+---+ +---+--+ +---+--+ |
| | | | |
+-----------------------------+
| | |
v v v
+---+--+ +--+---+ ++-----+
| BE | | BE | | BE |
+------+ +------+ +------+
FE通过JobScheduler将一个导入作业拆分成若干个Task。每个Task负责导入指定的一部分数据。Task被TaskScheduler分配到指定的BE 上执行。
在BE上,一个Task被视为一个普通的导入任务,通过Stream Load的导入机制进行导入。导入完成后,向FE汇报。
FE中的JobScheduler根据汇报结果,继续生成后续新的Task,或者对失败的Task进行重试。
整个Routine Load作业通过不断地产生新的Task,来完成数据不间断的导入。
Kafka例行导入
下面详细介绍Kafka例行导入的使用方式和最佳实践。
创建任务
创建例行导入任务的详细语法可以查看CREATE ROUTINE LOAD
命令手册或执行HELP ROUTINE LOAD;
查看语法帮助。下面以几个示例说明如何创建Routine Load任务。
为example_db的example_tbl创建一个名为test1的Kafka例行导入任务。指定列分隔符和group.id和client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
以严格模式为example_db的example_tbl创建一个名为test1的Kafka例行导入任务。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), WHERE k1 > 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
导入JSON格式数据使用示例。
Routine Load导入的JSON格式仅支持以下两种:
第一种为只有一条记录,且为JSON对象。
{"category":"a9jadhx","author":"test","price":895}
第二种为JSON数组,数组中可含多条记录。
[ { "category":"11", "author":"4avc", "price":895, "timestamp":1589191587 }, { "category":"22", "author":"2avc", "price":895, "timestamp":1589191487 }, { "category":"33", "author":"3avc", "price":342, "timestamp":1589191387 } ]
创建待导入的Doris数据表,示例如下。
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20200509")), PARTITION p20200509 VALUES [("20200509"), ("20200510")), PARTITION p20200510 VALUES [("20200510"), ("20200511")), PARTITION p20200511 VALUES [("20200511"), ("20200512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4 PROPERTIES ( "replication_num" = "1" );
以简单模式导入JSON数据,示例如下。
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1 COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
精准导入JSON格式数据,示例如下。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
说明表中的分区字段dt在示例的数据里并没有,而是在Routine load语句里通过
dt=from_unixtime(timestamp, '%Y%m%d')
转换得来的。strict mode与source data的导入关系
列类型为TinyInt,且表中的列允许导入空值时:
source data
source data example
string to int
strict_mode
result
空值
\N
N/A
true or false
NULL
not null
aaa or 2000
NULL
true
invalid data(filtered)
not null
aaa
NULL
true
NULL
not null
1
1
true or false
correct data
列类型为 Decimal(1,0),且表中的列允许导入空值时:
source data
source data example
string to int
strict_mode
result
空值
\N
N/A
true or false
NULL
not null
aaa
NULL
true
invalid data(filtered)
not null
aaa
NULL
false
NULL
not null
1 or 10
1
true or false
correct data
说明10虽然是一个超过范围的值,但是因为其类型符合decimal的要求,所以strict mode对其不产生影响。10最后会在其他ETL处理流程中被过滤,但不会被strict mode过滤。
访问SSL认证的Kafka集群
访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(ca.pem)。如果Kafka集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key)、以及密钥密码。文件需要先通过CREATE FILE命令上传到Doris中,并且catalog名称为kafka。CREATE FILE命令详情可使用
HELP CREATE FILE;
查看。示例如下。上传文件。
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
创建例行导入作业。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcd***" );
Doris通过Kafka的C++ API librdkafka来访问Kafka集群,librdkafka所支持的参数详情请参见librdkafka。
查看作业状态
查看作业状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD;
命令查看。
查看任务运行状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD TASK;
命令查看。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
修改作业属性
您可以使用ALTER ROUTINE LOAD
来修改已经创建的作业,或者通过HELP ALTER ROUTINE LOAD;
命令查看关于此命令的帮助文档。
作业控制
您可以通过STOP、PAUSE或RESUME三个命令来控制作业的停止、暂停和重启。可以通过HELP STOP ROUTINE LOAD;
、HELP PAUSE ROUTINE LOAD;
以及 HELP RESUME ROUTINE LOAD;
三个命令查看帮助和示例。
其他说明
例行导入作业和ALTER TABLE操作的关系
例行导入不会阻塞SCHEMA CHANGE和ROLLUP操作。
但需注意的是,如果SCHEMA CHANGE完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加Nullable列或带Default值的列来减少这类问题。
删除表的Partition可能会导致导入数据无法找到对应的Partition,使得作业暂停。
例行导入作业和其他导入作业的关系(LOAD、DELETE、INSERT)
例行导入和其他LOAD作业以及INSERT操作没有冲突。
当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务。所以在执行DELETE操作前,需要先暂停例行导入作业,并等待已下发的task全部完成后,才可以执行DELETE。
例行导入作业和DROP DATABASE/TABLE操作的关系:当例行导入对应的DATABASE或TABLE被删除后,作业会自动CANCEL。
Kafka类型的例行导入作业和Kafka topic的关系
当您在创建例行导入声明的kafka_topic在Kafka集群中不存在时:
如果您的Kafka集群的broker设置了auto.create.topics.enable = true,则kafka_topic会先被自动创建,自动创建的partition个数是由您的Kafka集群中的broker配置num.partitions决定的。例行作业会正常的不断读取该topic的数据。
如果您的Kafka集群的broker设置了auto.create.topics.enable = false, 则kafka_topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为PAUSED。
因此,如果您希望当Kafka topic不存在的时候,被例行作业可以自动创建,只需要将您的kafka集群中的broker设置auto.create.topics.enable = true即可。
在网络隔离的环境中可能出现的问题
在有些环境中存在网段和域名解析的隔离措施,所以需要注意:
创建Routine load任务中指定的Broker list必须能够被Doris服务访问。
Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必须能够被Doris服务访问。
关于指定消费的Partition和Offset
Doris支持指定Partition和Offset开始消费,新版中还支持了指定时间点进行消费的功能。有三个相关参数:
kafka_partitions:指定待消费的partition列表,如:"0, 1, 2, 3"。
kafka_offsets:指定每个分区的起始offset,必须和kafka_partitions列表个数对应,如:"1000, 1000, 2000, 2000"。
property.kafka_default_offset:指定分区默认的起始offset。
在创建导入作业时,这三个参数可以有以下组合:
组合
kafka_partitions
kafka_offsets
property.kafka_default_offset
行为
1
No
No
No
系统会自动查找topic对应的所有分区并从 OFFSET_END开始消费。
2
No
No
Yes
系统会自动查找topic对应的所有分区并从default offset指定的位置开始消费。
3
Yes
No
No
系统会从指定分区的OFFSET_END开始消费。
4
Yes
Yes
No
系统会从指定分区的指定offset处开始消费。
5
Yes
No
Yes
系统会从指定分区,default offset指定的位置开始消费。
STOP和PAUSE的区别
FE会自动定期清理STOP状态的ROUTINE LOAD,而PAUSE状态的则可以再次被恢复启用。
相关参数
一些系统配置参数会影响例行导入的使用,具体说明如下。
配置项 | FE/BE | 默认值 | 说明 |
max_routine_load_task_concurrent_num | FE | 5 | 该参数限制了一个例行导入作业最大的子任务并发数。可以运行时修改,建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。 |
max_routine_load_task_num_per_be | FE | 5 | 该参数限制了每个BE节点最多并发执行的子任务个数。可以运行时修改,建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。 |
max_routine_load_job_num | FE | 100 | 该参数限制了例行导入作业的总数,包括 NEED_SCHEDULED、RUNNING、PAUSE这些状态。可以运行时修改。超过后,不能再提交新的作业。 |
max_consumer_num_per_group | BE | 3 | 该参数表示一个子任务中最多生成几个consumer进行数据消费。对于Kafka数据源,一个consumer可能消费一个或多个Kafka partition。假设一个任务需要消费6个Kafka partition,则会生成3个 consumer,每个consumer消费2个partition。如果只有2个partition,则只会生成2个consumer,每个 consumer消费1个partition。 |
push_write_mbytes_per_sec | BE | 10,即10MB/s | 该参数是导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于SSD等高性能存储设备,可以适当增加这个限速。 |
max_tolerable_backend_down_num | FE | 0 | 在满足某些条件下,Doris可PAUSED的任务重新调度,即变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。 |
period_of_auto_resume_min | FE | 5分钟 | Doris重新调度,只会在5分钟这个周期内,最多尝试3次。如果3次都失败则锁定当前任务,后续不再进行调度。但可通过人为干预,进行手动恢复。 |