Routine Load continuously ingests data from Apache Kafka into StarRocks on EMR. Once a load job is running, StarRocks polls the Kafka topic automatically — you control the job lifecycle with SQL statements (pause, resume, or stop).
Terms
- RoutineLoadJob: A submitted routine import task.
- JobScheduler: The routine import task scheduler that splits a RoutineLoadJob into multiple Tasks based on rules.
- Task: A subtask created when JobScheduler splits a RoutineLoadJob according to defined rules.
- TaskScheduler: The task scheduler that manages Task execution.
How it works

- Submit a Kafka import task to the FE using a client that supports the MySQL protocol.
- The FE splits the import task into multiple Tasks. Each Task imports a specific portion of the data.
- Each Task runs on a designated BE. On the BE, a Task functions like a standard import task and uses the Stream Load mechanism to import data.
- After completing the import, the BE reports back to the FE.
- Based on the report, the FE either creates new Tasks or retries failed Tasks.
- The FE continuously generates new Tasks to ensure uninterrupted data import.
Import process
Environment requirements
-
Supports Kafka clusters with no authentication or SSL-based authentication.
-
Supported message formats:
-
CSV text format, where each message is one line without a trailing line feed.
-
JSON text format.
-
-
Does not support Array types.
-
Only supports Kafka version 0.10.0.0 or later.
Create an import task
-
Syntax
CREATE ROUTINE LOAD <database>.<job_name> ON <table_name> [COLUMNS TERMINATED BY "column_separator" ,] [COLUMNS (col1, col2, ...) ,] [WHERE where_condition ,] [PARTITION (part1, part2, ...)] [PROPERTIES ("key" = "value", ...)] FROM [DATA_SOURCE] [(data_source_properties1 = 'value1', data_source_properties2 = 'value2', ...)]The following table describes the parameters.
Parameter Required Description job_name Yes The name of the import task. You can prefix it with the database name. A common naming convention combines a timestamp and table name. Job names must be unique within a database. table_name Yes The name of the destination table. COLUMNS TERMINATED clause No Specifies the column delimiter in the source data file. The default delimiter is \t. COLUMNS clause No Defines the mapping between source data columns and table columns. - Mapped columns: For example, if the destination table has three columns—col1, col2, and col3—and the source data has four columns where the 1st, 2nd, and 4th map to col2, col1, and col3 respectively, write
COLUMNS (col2, col1, temp, col3). The temp column is a placeholder to skip the third source column. - Derived columns: StarRocks also supports transforming source columns. For example, if a fourth column col4 is added to the destination table and its value equals col1 + col2, write
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2).
WHERE clause No Specifies filter conditions to exclude unwanted rows. You can use mapped or derived columns in the condition. For example, to import only rows where k1 > 100 and k2 = 1000, write
WHERE k1 > 100 and k2 = 1000.PARTITION clause No Specifies the partitions of the destination table. If omitted, data is automatically routed to the appropriate partitions. PROPERTIES clause No Specifies general parameters for the import task. desired_concurrent_number No The import concurrency level—the maximum number of subtasks the import job can split into. Must be greater than 0. Default is 3. max_batch_interval No The maximum execution time per subtask, in seconds. Valid range is 5–60. Default is 10. In version 1.15 and later, this parameter defines the scheduling interval—how often the task runs. The data consumption duration is set by fe.conf's routine_load_task_consume_second (default: 3s). The task timeout is controlled by fe.conf's routine_load_task_timeout_second (default: 15s).
max_batch_rows No The maximum number of rows each subtask reads. Must be at least 200000. Default is 200000. In version 1.15 and later, this parameter defines the error detection window size, which equals 10 × max-batch-rows.
max_batch_size No The maximum number of bytes each subtask reads. Unit is bytes. Valid range is 100 MB to 1 GB. Default is 100 MB. In version 1.15 and later, this parameter is deprecated. Data consumption duration is now controlled by fe.conf's routine_load_task_consume_second (default: 3s).
max_error_number No The maximum number of error rows allowed in the sampling window. Must be ≥ 0. Default is 0 (no errors allowed). Important Rows filtered out by the WHERE clause do not count as error rows.strict_mode No Enables strict mode by default. If enabled, rows where non-null source data converts to NULL are filtered out. To disable, set this parameter to false.
timezone No Specifies the time zone for the import task. By default, it uses the session's timezone setting. This affects all time zone–dependent functions used during import.
DATA_SOURCE Yes Specifies the data source. Use KAFKA. data_source_properties No Specifies data source details, including the following: - kafka_broker_list: Kafka broker connection info in
ip:hostformat. Separate multiple brokers with commas (,). - kafka_topic: The Kafka topic to subscribe to.Note If you specify data source properties, both kafka_broker_list and kafka_topic are required.
- kafka_partitions and kafka_offsets: Specify the Kafka partitions to subscribe to and their starting offsets.
- property: Kafka-related properties, equivalent to the
"--property"parameter in Kafka Shell. For full syntax details, runHELP ROUTINE LOAD;.
- Mapped columns: For example, if the destination table has three columns—col1, col2, and col3—and the source data has four columns where the 1st, 2nd, and 4th map to col2, col1, and col3 respectively, write
-
Example: Submit a non-authenticated Routine Load task named example_tbl2_ordertest to StarRocks. Continuously consume messages from the ordertest2 topic in a Kafka cluster and import them into the example_tbl2 table, starting from the earliest offset in the specified partitions.
CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="5", "format" ="json", "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]" ) FROM KAFKA ( "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>", "kafka_topic" = "ordertest2", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); -
Example: Access Kafka using the SSL security protocol. Configuration example:
-- Set the security protocol to SSL. "property.security.protocol" = "ssl", -- Path to the CA certificate. "property.ssl.ca.location" = "FILE:ca-cert", -- If Kafka Server requires client authentication, also set these three parameters: -- Path to the client's public key. "property.ssl.certificate.location" = "FILE:client.pem", -- Path to the client's private key. "property.ssl.key.location" = "FILE:client.key", -- Password for the client's private key. "property.ssl.key.password" = "******"For details about creating files, see CREATE FILE.
NoteWhen using CREATE FILE, use the HTTP endpoint of OSS as the
url. For usage details, see Access OSS over IPv6.
View task status
-
Show all routine import tasks (including stopped or canceled ones) in the load_test database. Returns one or more rows.
USE load_test; SHOW ALL ROUTINE LOAD; -
Show the currently running routine import task named example_tbl2_ordertest in the load_test database.
SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest; -
In the EMR StarRocks Manager console, click Metadata Management, click the target database name, then click Tasks. View the task execution status on the Kafka Import tab.
StarRocks can only show currently running tasks. Completed or unstarted tasks are not visible.
Run SHOW ALL ROUTINE LOAD to view all active Routine Load tasks. Sample output:
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)
This example creates an import task named routine_load_wikipedia. Parameter descriptions:
|
Parameter |
Description |
|
State |
Task status. RUNNING means the import task is actively running. |
|
Statistic |
Progress metrics tracking import activity since task creation. |
|
receivedBytes |
Amount of data received, in bytes. |
|
errorRows |
Number of rows with import errors. |
|
committedTaskNum |
Number of Tasks committed by the FE. |
|
loadedRows |
Number of rows successfully imported. |
|
loadRowsRate |
Data import rate, in rows per second (row/s). |
|
abortedTaskNum |
Number of Tasks that failed on BEs. |
|
totalRows |
Total number of rows received. |
|
unselectedRows |
Number of rows filtered out by the WHERE clause. |
|
receivedBytesRate |
Data reception rate, in bytes per second (Bytes/s). |
|
taskExecuteTimeMs |
Import duration, in milliseconds (ms). |
|
ErrorLogUrls |
URLs to error logs showing import errors. |
Pause an import task
After running the PAUSE statement, the import task enters PAUSED state. Data import stops, but the task remains active and can be resumed with the RESUME statement.
PAUSE ROUTINE LOAD FOR <job_name>;
After pausing, the task’s State changes to PAUSED. The Statistic and Progress fields stop updating. The paused task remains visible via SHOW ROUTINE LOAD.
Resume an import task
After running the RESUME statement, the task briefly enters NEED_SCHEDULE state while being rescheduled, then returns to RUNNING state to continue importing data.
RESUME ROUTINE LOAD FOR <job_name>;
Stop an import task
Run the STOP statement to move the import task to STOPPED state. Data import halts permanently and cannot be resumed.
STOP ROUTINE LOAD FOR <job_name>;
After stopping, the task’s State becomes STOPPED. The Statistic and Progress fields no longer update. Stopped tasks are not visible via SHOW ROUTINE LOAD.
MySQL [load_test]> SHOW ROUTINE LOAD FOR example_tbl2_ordertest2 \G;
ERROR 1064 (HY000): There is no running job named example_tbl2_ordertest2 in db load_test. Include history? false, you can try `show all routine load job for job_name` if you want to list stopped and cancelled jobs
ERROR: No query specified
Best practices
This example creates a Routine Load task that continuously consumes CSV-formatted data from a Kafka cluster and imports it into StarRocks.
-
On the Kafka cluster:
-
Create a test topic.
kafka-topics.sh --create --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092" -
Run the following command to generate data.
kafka-console-producer.sh --broker-list core-1-1:9092 --topic order_sr_topic -
Enter test data.
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895 2020050802,2020-05-08,Julien Sorel,France,male,893 2020050803,2020-05-08,Dorian Grey,UK,male,1262 2020051001,2020-05-10,Tess Durbeyfield,US,female,986 2020051101,2020-05-11,Edogawa Conan,japan,male,8924
-
-
Perform the following operations in the StarRocks cluster.
-
Create the destination database and table.
Based on the CSV data (importing all columns except the fifth gender column), create the table routine_load_tbl_csv in the load_test database.
CREATE TABLE load_test.routine_load_tbl_csv ( `order_id` bigint NOT NULL COMMENT "Order ID", `pay_dt` date NOT NULL COMMENT "Payment date", `customer_name` varchar(26) NULL COMMENT "Customer name", `nationality` varchar(26) NULL COMMENT "Nationality", `price` double NULL COMMENT "Payment amount" ) ENGINE=OLAP PRIMARY KEY (order_id,pay_dt) DISTRIBUTED BY HASH(`order_id`) BUCKETS 5; -
Create the import task.
CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv COLUMNS TERMINATED BY ",", COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price) PROPERTIES ( "desired_concurrent_number" = "5" ) FROM KAFKA ( "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "order_sr_topic", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" ) -
Run the following command to view information about the import task named routine_load_tbl_ordertest_csv.
SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;If the state is RUNNING, the job is operating normally.
-
Query the destination table to confirm data synchronization.
SELECT * FROM routine_load_tbl_csv;You can also perform these operations:
-
Pause the import task
PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv; -
Resume the import task
RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv; -
Modify the import task
NoteYou can only modify tasks in PAUSED state.
Example: Change desired_concurrent_number to 6.
ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv PROPERTIES ( "desired_concurrent_number" = "6" ) -
Stop the import task
STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
-
-