Routine Load是一种例行导入方式,StarRocks通过该方式支持从Kafka持续不断的导入数据,并且支持通过SQL控制导入任务的暂停、重启和停止。本文为您介绍Routine Load导入的基本原理、导入示例以及常见问题。
基本概念
- RoutineLoadJob:提交的一个例行导入任务。
- JobScheduler:例行导入任务调度器,用于调度和拆分一个RoutineLoadJob为多个Task。
- Task:RoutineLoadJob被JobScheduler根据规则拆分的子任务。
- TaskScheduler:任务调度器,用于调度Task的执行。
基本原理
- 用户通过支持MySQL协议的客户端向FE提交一个Kafka导入任务。
- FE将一个导入任务拆分成若干个Task,每个Task负责导入指定的一部分数据。
- 每个Task被分配到指定的BE上执行。在BE上,一个Task被视为一个普通的导入任务,通过Stream Load的导入机制进行导入。
- BE导入完成后,向FE汇报。
- FE根据汇报结果,继续生成后续新的Task,或者对失败的Task进行重试。
- FE会不断的产生新的Task,来完成数据不间断的导入。
导入流程
环境要求
支持访问无认证或使用SSL方式认证的Kafka集群。
支持的消息格式如下:
CSV文本格式,每一个message为一行,且行尾不包含换行符。
JSON文本格式。
不支持Array类型。
仅支持Kafka 0.10.0.0及以上版本。
创建导入任务
语法
CREATE ROUTINE LOAD <database>.<job_name> ON <table_name> [COLUMNS TERMINATED BY "column_separator" ,] [COLUMNS (col1, col2, ...) ,] [WHERE where_condition ,] [PARTITION (part1, part2, ...)] [PROPERTIES ("key" = "value", ...)] FROM [DATA_SOURCE] [(data_source_properties1 = 'value1', data_source_properties2 = 'value2', ...)]
相关参数描述如下表所示。
参数 是否必填 描述 job_name 是 导入任务的名称,前缀可以携带导入数据库名称,常见命名方式为时间戳+表名。 一个DataBase内,任务名称不可重复。 table_name 是 导入的目标表的名称。 COLUMNS TERMINATED子句 否 指定源数据文件中的列分隔符,分隔符默认为\t。 COLUMNS子句 否 指定源数据中列和表中列的映射关系。 - 映射列:例如,目标表有三列col1、col2和col3,源数据有4列,其中第1、2、4列分别对应col2、col1和col3,则书写为
COLUMNS (col2, col1, temp, col3)
,其中temp列为不存在的一列,用于跳过源数据中的第三列。 - 衍生列:除了直接读取源数据的列内容之外,StarRocks还提供对数据列的加工操作。例如,目标表后加入了第四列col4,其结果由col1 + col2产生,则可以书写为
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)
。
WHERE子句 否 指定过滤条件,可以过滤掉不需要的行。过滤条件可以指定映射列或衍生列。 例如,只导入k1大于100并且k2等于1000的行,则书写为
WHERE k1 > 100 and k2 = 1000
。PARTITION子句 否 指定导入目标表的Partition。如果不指定,则会自动导入到对应的Partition中。 PROPERTIES子句 否 指定导入任务的通用参数。 desired_concurrent_number 否 导入并发度,指定一个导入任务最多会被分成多少个子任务执行。必须大于0,默认值为3。 max_batch_interval 否 每个子任务的最大执行时间。范围为5~60,单位是秒。默认值为10。 1.15版本后,该参数表示子任务的调度时间,即任务多久执行一次。任务的消费数据时间为fe.conf中的routine_load_task_consume_second,默认为3s。任务的执行超时时间为fe.conf中的routine_load_task_timeout_second,默认为15s。
max_batch_rows 否 每个子任务最多读取的行数。必须大于等于200000。默认值为200000。 1.15版本后,该参数只用于定义错误检测窗口范围,窗口的范围是10 * max-batch-rows。
max_batch_size 否 每个子任务最多读取的字节数。单位为字节,范围是100 MB到1 GB。默认值为100 MB。 1.15版本后,废弃该参数,任务消费数据的时间为fe.conf中的routine_load_task_consume_second,默认为3s。
max_error_number 否 采样窗口内,允许的最大错误行数。必须大于等于0。默认是0,即不允许有错误行。 重要 被WHERE条件过滤掉的行不算错误行。strict_mode 否 是否开启严格模式,默认为开启。 如果开启后,非空原始数据的列类型变换为NULL,则会被过滤。关闭方式为设置该参数为false。
timezone 否 指定导入任务所使用的时区。 默认为使用Session的timezone参数。该参数会影响所有导入涉及的和时区有关的函数结果。
DATA_SOURCE 是 指定数据源,请使用KAFKA。 data_source_properties 否 指定数据源相关的信息。包括以下参数: - kafka_broker_list:Kafka的Broker连接信息,格式为
ip:host
。多个Broker之间以逗号(,)分隔。 - kafka_topic:指定待订阅的Kafka的Topic。说明 如果指定数据源相关的信息,则kafka_broker_list和kafka_topic必填。
- kafka_partitions和kafka_offsets:指定需要订阅的Kafka Partition,以及对应的每个Partition的起始offset。
- property:Kafka相关的属性,功能等同于Kafka Shell中
"--property"
参数。创建导入任务更详细的语法可以通过执行HELP ROUTINE LOAD;
命令查看。
- 映射列:例如,目标表有三列col1、col2和col3,源数据有4列,其中第1、2、4列分别对应col2、col1和col3,则书写为
示例:向StarRocks提交一个无认证的Routine Load导入任务example_tbl2_ordertest,持续消费Kafka集群中Topic ordertest2的消息,并导入至example_tbl2表中,导入任务会从此Topic所指定分区的最早位点开始消费。
CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="5", "format" ="json", "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]" ) FROM KAFKA ( "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>", "kafka_topic" = "ordertest2", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
示例:使用SSL安全协议访问Kafka,具体的配置示例如下。
-- 指定安全协议为SSL。 "property.security.protocol" = "ssl", -- CA证书的位置。 "property.ssl.ca.location" = "FILE:ca-cert", -- 如果Kafka Server端开启了Client认证,则还需设置以下三个参数: -- Client的公钥位置。 "property.ssl.certificate.location" = "FILE:client.pem", -- Client的私钥位置。 "property.ssl.key.location" = "FILE:client.key", -- Client私钥的密码。 "property.ssl.key.password" = "******"
关于创建文件的详细信息,请参见CREATE FILE。
说明在使用CREATE FILE时,请使用OSS的HTTP访问地址作为
url
。具体的使用方式,请参见OSS访问域名使用规则。
查看任务状态
显示load_test下,所有的例行导入任务(包括已停止或取消的任务)。结果为一行或多行。
USE load_test; SHOW ALL ROUTINE LOAD;
显示load_test下,名称为example_tbl2_ordertest的当前正在运行的例行导入任务。
SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;
在EMR StarRocks Manager控制台,单击元数据管理,单击待查看的数据库名称,单击任务,即可在Kafka导入页签查看任务的执行状态。
StarRocks只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
执行SHOW ALL ROUTINE LOAD
命令,可以查看当前正在运行的所有Routine Load任务,返回如下类似信息。
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)
本示例创建名为routine_load_wikipedia的导入任务,相关参数描述如下表。
参数 | 描述 |
State | 导入任务状态。RUNNING表示该导入任务处于持续运行中。 |
Statistic | 进度信息,记录了从创建任务开始后的导入信息。 |
receivedBytes | 接收到的数据大小,单位是Byte。 |
errorRows | 导入错误行数。 |
committedTaskNum | FE提交的Task数。 |
loadedRows | 已导入的行数。 |
loadRowsRate | 导入数据速率,单位是行每秒(row/s)。 |
abortedTaskNum | BE失败的Task数。 |
totalRows | 接收的总行数。 |
unselectedRows | 被WHERE条件过滤的行数。 |
receivedBytesRate | 接收数据速率,单位是Bytes/s。 |
taskExecuteTimeMs | 导入耗时,单位是ms。 |
ErrorLogUrls | 错误信息日志,可以通过URL看到导入过程中的错误信息。 |
暂停导入任务
使用PAUSE语句后,此时导入任务进入PAUSED状态,数据暂停导入,但任务未终止,可以通过RESUME语句重启任务。
PAUSE ROUTINE LOAD FOR <job_name>;
暂停导入任务后,任务的State
变更为PAUSED
,Statistic
和Progress
中的导入信息停止更新。此时,任务并未终止,通过SHOW ROUTINE LOAD
语句可以看到已经暂停的导入任务。
恢复导入任务
使用RESUME语句后,任务会短暂的进入NEED_SCHEDULE状态,表示任务正在重新调度,一段时间后会重新恢复至RUNNING状态,继续导入数据。
RESUME ROUTINE LOAD FOR <job_name>;
停止导入任务
使用STOP语句让导入任务进入STOP状态,数据停止导入,任务终止,无法恢复数据导入。
STOP ROUTINE LOAD FOR <job_name>;
停止导入任务后,任务的State
变更为STOPPED
,Statistic
和Progress
中的导入信息再也不会更新。此时,通过SHOW ROUTINE LOAD
语句无法看到已经停止的导入任务。
最佳实践案例
本案例是创建了一个Routine Load导入任务,持续不断地消费Kafka集群的CSV格式的数据,然后导入至StarRocks中。
在Kafka集群中执行以下操作。
创建测试的topic。
kafka-topics.sh --create --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092"
执行如下命令,生产数据。
kafka-console-producer.sh --broker-list core-1-1:9092 --topic order_sr_topic
输入测试数据。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895 2020050802,2020-05-08,Julien Sorel,France,male,893 2020050803,2020-05-08,Dorian Grey,UK,male,1262 2020051001,2020-05-10,Tess Durbeyfield,US,female,986 2020051101,2020-05-11,Edogawa Conan,japan,male,8924
在StarRocks集群中执行以下操作。
执行以下命令,创建目标数据库和数据表。
根据CSV数据中需要导入的几列(例如除第五列性别外的其余五列需要导入至StarRocks), 在StarRocks集群的目标数据库load_test 中创建表routine_load_tbl_csv。
CREATE TABLE load_test.routine_load_tbl_csv ( `order_id` bigint NOT NULL COMMENT "订单编号", `pay_dt` date NOT NULL COMMENT "支付日期", `customer_name` varchar(26) NULL COMMENT "顾客姓名", `nationality` varchar(26) NULL COMMENT "国籍", `price`double NULL COMMENT "支付金额" ) ENGINE=OLAP PRIMARY KEY (order_id,pay_dt) DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;
执行以下命令,创建导入任务。
CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv COLUMNS TERMINATED BY ",", COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price) PROPERTIES ( "desired_concurrent_number" = "5" ) FROM KAFKA ( "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "order_sr_topic", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" )
执行以下命令,查看名称为routine_load_tbl_ordertest_csv的导入任务的信息。
SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
如果状态为RUNNING,则说明作业运行正常。
执行以下命令,查询目标表中的数据,您会发现数据已经同步完成。
SELECT * FROM routine_load_tbl_csv;
您还可以任务进行以下操作:
暂停导入任务
PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
恢复导入任务
RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
修改导入任务
说明仅支持修改状态为PAUSED的任务。
例如:修改desired_concurrent_number为6。
ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv PROPERTIES ( "desired_concurrent_number" = "6" )
停止导入任务
STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;