Routine Load

更新时间:
复制 MD 格式

Routine Load continuously ingests data from Apache Kafka into StarRocks on EMR. Once a load job is running, StarRocks polls the Kafka topic automatically — you control the job lifecycle with SQL statements (pause, resume, or stop).

Terms

  • RoutineLoadJob: A submitted routine import task.
  • JobScheduler: The routine import task scheduler that splits a RoutineLoadJob into multiple Tasks based on rules.
  • Task: A subtask created when JobScheduler splits a RoutineLoadJob according to defined rules.
  • TaskScheduler: The task scheduler that manages Task execution.

How it works

The Routine Load import process is shown in the following figure.Routine Load
The import process works as follows:
  1. Submit a Kafka import task to the FE using a client that supports the MySQL protocol.
  2. The FE splits the import task into multiple Tasks. Each Task imports a specific portion of the data.
  3. Each Task runs on a designated BE. On the BE, a Task functions like a standard import task and uses the Stream Load mechanism to import data.
  4. After completing the import, the BE reports back to the FE.
  5. Based on the report, the FE either creates new Tasks or retries failed Tasks.
  6. The FE continuously generates new Tasks to ensure uninterrupted data import.
Note The images and some content in this topic are sourced from the open source StarRocks documentation: Continuously import from Apache Kafka.

Import process

Environment requirements

  • Supports Kafka clusters with no authentication or SSL-based authentication.

  • Supported message formats:

    • CSV text format, where each message is one line without a trailing line feed.

    • JSON text format.

  • Does not support Array types.

  • Only supports Kafka version 0.10.0.0 or later.

Create an import task

  • Syntax

    CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
        [COLUMNS TERMINATED BY "column_separator" ,]
        [COLUMNS (col1, col2, ...) ,]
        [WHERE where_condition ,]
        [PARTITION (part1, part2, ...)]
        [PROPERTIES ("key" = "value", ...)]
        FROM [DATA_SOURCE]
        [(data_source_properties1 = 'value1',
        data_source_properties2 = 'value2',
        ...)]

    The following table describes the parameters.

    ParameterRequiredDescription
    job_nameYesThe name of the import task. You can prefix it with the database name. A common naming convention combines a timestamp and table name. Job names must be unique within a database.
    table_nameYesThe name of the destination table.
    COLUMNS TERMINATED clauseNoSpecifies the column delimiter in the source data file. The default delimiter is \t.
    COLUMNS clauseNoDefines the mapping between source data columns and table columns.
    • Mapped columns: For example, if the destination table has three columns—col1, col2, and col3—and the source data has four columns where the 1st, 2nd, and 4th map to col2, col1, and col3 respectively, write COLUMNS (col2, col1, temp, col3). The temp column is a placeholder to skip the third source column.
    • Derived columns: StarRocks also supports transforming source columns. For example, if a fourth column col4 is added to the destination table and its value equals col1 + col2, write COLUMNS (col2, col1, temp, col3, col4 = col1 + col2).
    WHERE clauseNoSpecifies filter conditions to exclude unwanted rows. You can use mapped or derived columns in the condition.

    For example, to import only rows where k1 > 100 and k2 = 1000, write WHERE k1 > 100 and k2 = 1000.

    PARTITION clauseNoSpecifies the partitions of the destination table. If omitted, data is automatically routed to the appropriate partitions.
    PROPERTIES clauseNoSpecifies general parameters for the import task.
    desired_concurrent_numberNoThe import concurrency level—the maximum number of subtasks the import job can split into. Must be greater than 0. Default is 3.
    max_batch_intervalNoThe maximum execution time per subtask, in seconds. Valid range is 5–60. Default is 10.

    In version 1.15 and later, this parameter defines the scheduling interval—how often the task runs. The data consumption duration is set by fe.conf's routine_load_task_consume_second (default: 3s). The task timeout is controlled by fe.conf's routine_load_task_timeout_second (default: 15s).

    max_batch_rowsNoThe maximum number of rows each subtask reads. Must be at least 200000. Default is 200000.

    In version 1.15 and later, this parameter defines the error detection window size, which equals 10 × max-batch-rows.

    max_batch_sizeNoThe maximum number of bytes each subtask reads. Unit is bytes. Valid range is 100 MB to 1 GB. Default is 100 MB.

    In version 1.15 and later, this parameter is deprecated. Data consumption duration is now controlled by fe.conf's routine_load_task_consume_second (default: 3s).

    max_error_numberNoThe maximum number of error rows allowed in the sampling window. Must be ≥ 0. Default is 0 (no errors allowed).
    Important Rows filtered out by the WHERE clause do not count as error rows.
    strict_modeNoEnables strict mode by default.

    If enabled, rows where non-null source data converts to NULL are filtered out. To disable, set this parameter to false.

    timezoneNoSpecifies the time zone for the import task.

    By default, it uses the session's timezone setting. This affects all time zone–dependent functions used during import.

    DATA_SOURCEYesSpecifies the data source. Use KAFKA.
    data_source_propertiesNoSpecifies data source details, including the following:
    • kafka_broker_list: Kafka broker connection info in ip:host format. Separate multiple brokers with commas (,).
    • kafka_topic: The Kafka topic to subscribe to.
      Note If you specify data source properties, both kafka_broker_list and kafka_topic are required.
    • kafka_partitions and kafka_offsets: Specify the Kafka partitions to subscribe to and their starting offsets.
    • property: Kafka-related properties, equivalent to the "--property" parameter in Kafka Shell. For full syntax details, run HELP ROUTINE LOAD;.
  • Example: Submit a non-authenticated Routine Load task named example_tbl2_ordertest to StarRocks. Continuously consume messages from the ordertest2 topic in a Kafka cluster and import them into the example_tbl2 table, starting from the earliest offset in the specified partitions.

    CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
    COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="5",
        "format" ="json",
        "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
     )
    FROM KAFKA
    (
        "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
        "kafka_topic" = "ordertest2",
        "kafka_partitions" ="0,1,2,3,4",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
  • Example: Access Kafka using the SSL security protocol. Configuration example:

    -- Set the security protocol to SSL.
    "property.security.protocol" = "ssl", 
     -- Path to the CA certificate.
    "property.ssl.ca.location" = "FILE:ca-cert",
    -- If Kafka Server requires client authentication, also set these three parameters:
    -- Path to the client's public key.
    "property.ssl.certificate.location" = "FILE:client.pem", 
    -- Path to the client's private key.
    "property.ssl.key.location" = "FILE:client.key", 
    -- Password for the client's private key.
    "property.ssl.key.password" = "******"

    For details about creating files, see CREATE FILE.

    Note

    When using CREATE FILE, use the HTTP endpoint of OSS as the url. For usage details, see Access OSS over IPv6.

View task status

  • Show all routine import tasks (including stopped or canceled ones) in the load_test database. Returns one or more rows.

    USE load_test;
    SHOW ALL ROUTINE LOAD;
  • Show the currently running routine import task named example_tbl2_ordertest in the load_test database.

    SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;
  • In the EMR StarRocks Manager console, click Metadata Management, click the target database name, then click Tasks. View the task execution status on the Kafka Import tab.

Important

StarRocks can only show currently running tasks. Completed or unstarted tasks are not visible.

Run SHOW ALL ROUTINE LOAD to view all active Routine Load tasks. Sample output:

*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)

This example creates an import task named routine_load_wikipedia. Parameter descriptions:

Parameter

Description

State

Task status. RUNNING means the import task is actively running.

Statistic

Progress metrics tracking import activity since task creation.

receivedBytes

Amount of data received, in bytes.

errorRows

Number of rows with import errors.

committedTaskNum

Number of Tasks committed by the FE.

loadedRows

Number of rows successfully imported.

loadRowsRate

Data import rate, in rows per second (row/s).

abortedTaskNum

Number of Tasks that failed on BEs.

totalRows

Total number of rows received.

unselectedRows

Number of rows filtered out by the WHERE clause.

receivedBytesRate

Data reception rate, in bytes per second (Bytes/s).

taskExecuteTimeMs

Import duration, in milliseconds (ms).

ErrorLogUrls

URLs to error logs showing import errors.

Pause an import task

After running the PAUSE statement, the import task enters PAUSED state. Data import stops, but the task remains active and can be resumed with the RESUME statement.

PAUSE ROUTINE LOAD FOR <job_name>;

After pausing, the task’s State changes to PAUSED. The Statistic and Progress fields stop updating. The paused task remains visible via SHOW ROUTINE LOAD.

Resume an import task

After running the RESUME statement, the task briefly enters NEED_SCHEDULE state while being rescheduled, then returns to RUNNING state to continue importing data.

RESUME ROUTINE LOAD FOR <job_name>;

Stop an import task

Run the STOP statement to move the import task to STOPPED state. Data import halts permanently and cannot be resumed.

STOP ROUTINE LOAD FOR <job_name>;

After stopping, the task’s State becomes STOPPED. The Statistic and Progress fields no longer update. Stopped tasks are not visible via SHOW ROUTINE LOAD.

MySQL [load_test]> SHOW ROUTINE LOAD FOR example_tbl2_ordertest2 \G;
ERROR 1064 (HY000): There is no running job named example_tbl2_ordertest2 in db load_test. Include history? false, you can try `show all routine load job for job_name` if you want to list stopped and cancelled jobs
ERROR: No query specified

Best practices

This example creates a Routine Load task that continuously consumes CSV-formatted data from a Kafka cluster and imports it into StarRocks.

  1. On the Kafka cluster:

    1. Create a test topic.

      kafka-topics.sh --create  --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092"
    2. Run the following command to generate data.

      kafka-console-producer.sh  --broker-list core-1-1:9092 --topic order_sr_topic
    3. Enter test data.

      2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
      2020050802,2020-05-08,Julien Sorel,France,male,893
      2020050803,2020-05-08,Dorian Grey,UK,male,1262
      2020051001,2020-05-10,Tess Durbeyfield,US,female,986
      2020051101,2020-05-11,Edogawa Conan,japan,male,8924
  2. Perform the following operations in the StarRocks cluster.

    1. Create the destination database and table.

      Based on the CSV data (importing all columns except the fifth gender column), create the table routine_load_tbl_csv in the load_test database.

      CREATE TABLE load_test.routine_load_tbl_csv (
          `order_id` bigint NOT NULL COMMENT "Order ID",
          `pay_dt` date NOT NULL COMMENT "Payment date",
          `customer_name` varchar(26) NULL COMMENT "Customer name",
          `nationality` varchar(26) NULL COMMENT "Nationality",
          `price` double NULL COMMENT "Payment amount"
      )
      ENGINE=OLAP
      PRIMARY KEY (order_id,pay_dt)
      DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;
    2. Create the import task.

      CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
      COLUMNS TERMINATED BY ",",
      COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
      PROPERTIES
      (
          "desired_concurrent_number" = "5"
      )
      FROM KAFKA
      (
          "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
          "kafka_topic" = "order_sr_topic",
          "kafka_partitions" ="0,1,2,3,4",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      )
    3. Run the following command to view information about the import task named routine_load_tbl_ordertest_csv.

      SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;

      If the state is RUNNING, the job is operating normally.

    4. Query the destination table to confirm data synchronization.

      SELECT * FROM routine_load_tbl_csv;

      You can also perform these operations:

      • Pause the import task

        PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
      • Resume the import task

        RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
      • Modify the import task

        Note

        You can only modify tasks in PAUSED state.

        Example: Change desired_concurrent_number to 6.

        ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv
        PROPERTIES
        (
            "desired_concurrent_number" = "6"
        )
      • Stop the import task

        STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;