Routine Load

Routine Load支持提交一个常驻的导入作业,不断地从指定的数据源读取数据,将数据持续地导入至云数据库 SelectDB 版中。本文介绍如何通过Routine LoadKafka中的数据导入至云数据库 SelectDB 版实例。

前提条件

  • 支持的数据源:目前仅支持Kafka数据源,可通过无认证方式或PLAIN/SSL/Kerberos等认证方式连接Kafka。

  • 支持的消息格式:CSVJSON格式。CSV的格式,每一个Message为一行,且行尾不包含换行符。

注意事项

默认支持Kafka 0.10.0.0(含)以上版本。如果使用Kafka 0.10.0.0以下版本(0.9.0,0.8.2,0.8.1,0.8.0),需要Kafka兼容旧版本,具体操作有以下两种方式:

  • BE的配置kafka_broker_version_fallback的值设置为要兼容的旧版本。

  • 创建Routine Load的时候直接设置property.broker.version.fallback的值为要兼容的旧版本。

说明

使用兼容旧版本的代价在于,Routine Load的部分新特性可能无法使用,例如根据时间设置Kafka分区的offset。

创建导入作业

使用Routine Load功能时,首先需创建一个Routine Load作业。该作业将通过例行调度持续发送任务,每个任务会消耗一定数量的Kafka消息。

语法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

参数说明

参数名称

参数说明

[db.]job_name

导入作业的名称。在同一个Database内,相同名称的job只能运行一个。

tbl_name

指定导入的表的名称。

merge_type

指定数据合并类型。默认为APPEND,表示导入的数据都是普通的追加写操作。MERGEDELETE类型仅适用于Unique Key模型表。其中MERGE类型需要配合[DELETE ON]语句使用,以标注Delete Flag列。而DELETE类型则表示导入的所有数据皆为删除数据。

load_properties

指定导入数据处理相关参数。详细参数说明,请参见load_properties

job_properties

指定导入作业相关参数。详细参数说明,请参见job_properties参数说明

data_source_properties

指定数据源的类型。详细参数说明,请参见data_source_properties参数说明

load_properties参数说明

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

参数名称

示例值

参数说明

column_separator

COLUMNS TERMINATED BY ","

指定列分隔符,默认为\t

columns_mapping

(k1,k2,tmpk1,k3=tmpk1+1)

指定文件列和表中列的映射关系,以及各种列转换等。详细说明请参见数据转化

preceding_filter

指定过滤原始数据条件。详细说明请参见数据转化

where_predicates

WHERE k1>100 and k2=1000

指定条件对导入的数据进行过滤。详细说明请参见数据转化

partitions

PARTITION(p1,p2,p3)

指定导入目的表的哪些Partition中。如果不指定,则会自动导入到对应的Partition中。

DELETE ON

DELETE ON v3>100

用于指定导入数据中表示Delete Flag的列和计算关系。

说明

需配合MEREGE导入模式一起使用,仅适用于Unique Key模型的表。

ORDER BY

用于指定导入数据中表示Sequence Col的列。其功能为导入数据时保证数据顺序。

说明

仅适用于对Unique Key模型的表。

job_properties参数说明

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
说明

这三个参数max_batch_interval、max_batch_rowsmax_batch_size用于控制子任务的执行时间和处理量。当任意一个参数达到设定阈值时,任务将终止。

参数名称

示例值

参数说明

desired_concurrent_number

"desired_concurrent_number" = "3"

指定期望并发度。大于0,默认为3。一个例行导入作业会被分成多个子任务执行。这个参数用于指定一个作业最多有多少任务可以同时执行。

说明
  1. 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合判断。

  2. 适当提高并发可利用分布式集群加速,但过高会导致大量小文件写入,建议取值为集群核数 / 16

max_batch_interval

"max_batch_interval" = "20"

指定每个子任务最大执行时间,单位是秒,默认为10,取值范围为5~60秒。

max_batch_rows

"max_batch_rows" = "300000"

指定每个子任务最多读取的行数。默认是200000,取值范围大于等于200000。

max_batch_size

"max_batch_size" = "209715200"

指定每个子任务最多读取的字节数。单位是字节,默认为104857600,即100 MB。取值范围为100 MB~1 GB。

max_error_number

"max_error_number"="3"

指定采样窗口内,允许的最大错误行数。默认为0,即不允许有错误行。取值范围大于等于0。

采样窗口为max_batch_rows*10。即如果在采样窗口内,错误行数大于,则会导致例行作业被暂停,需要人工介入检查数据质量问题。

说明

where条件过滤掉的行不算错误行。

strict_mode

"strict_mode"="true"

指定是否开启严格模式,默认为false。开启后,非空原始数据的列类型变换如果结果为NULL,则会被过滤。指定方式为严格模式时,即对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

  • 对于列类型转换来说,如果strict modetrue,则错误的数据将被过滤。错误数据即原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

  • 对于导入的某列由函数变换生成时,strict mode对其不产生影响。

  • 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode对其也不产生影响。例如:如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据strict对其不产生影响。

timezone

"timezone" = "Africa/Abidjan"

指定导入作业所使用的时区。默认为使用Session的时区作为参数。

说明

该参数会影响所有导入涉及的和时区有关的函数结果。

format

"format" = "json"

指定导入数据格式,默认为CSV,支持JSON格式。

jsonpaths

-H "jsonpaths:[\"$.k2\",\"$.k1\"]"

当导入数据格式为JSON时,可通过jsonpaths指定抽取JSON数据中的字段。

strip_outer_array

-H "strip_outer_array:true"

当导入数据格式为JSON时,strip_outer_arraytrue表示JSON数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认为false

json_root

-H "json_root:$.RECORDS"

当导入数据格式为JSON时,可以通过json_root指定JSON数据的根节点。SelectDB将通过json_root抽取根节点的元素进行解析。默认为空。

send_batch_parallelism

指定设置发送批处理数据的并行度,如果并行度的值超过BE配置中的max_send_batch_parallelism_per_job,那么作为协调点的BE将使用max_send_batch_parallelism_per_job的值。

load_to_single_tablet

指定是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许向对带有random分区的Duplicate表导入数据的时设置。

严格模式(strict mode)与原始数据(source data)的导入关系

例如列类型为TinyInt。当表中的列允许导入空值时导入关系如下。

source data

source data example

string to int

strict_mode

result

空值

\N

N/A

true or false

NULL

not null

aaa or 2000

NULL

true

invalid data(filtered)

not null

aaa

NULL

false

NULL

not null

1

1

true or false

correct data

例如列类型为Decimal(1,0)。当表中的列允许导入空值时导入关系如下。

source data

source data example

string to int

strict_mode

result

空值

\N

N/A

true or false

NULL

not null

aaa

NULL

true

invalid data(filtered)

not null

aaa

NULL

false

NULL

not null

1 or 10

1

true or false

correct data

说明

10虽然是一个超过范围的值,但是因为其类型符合Decimal的要求,所以strict mode对其不产生影响。10最后会在其他ETL处理流程中被过滤。但不会被strict mode过滤。

data_source_properties参数说明

FROM KAFKA
(
    "key1" = "val1",
    "key2" = "val2"
)

参数名称

参数说明

kafka_broker_list

指定KafkaBroker连接信息。格式为ip:host。多个Broker之间以逗号分隔。

格式:"kafka_broker_list"="broker1:9092,broker2:9092"

kafka_topic

指定订阅的KafkaTopic。

格式:"kafka_topic"="my_topic"

kafka_partitions/kafka_offsets

指定订阅的Kafka Partition,以及对应的每个Partition的起始Offset。如果指定时间,则会从大于等于该时间的最近一个Offset处开始消费。

Offset可以指定从大于等于0的具体Offset,或者:

  • OFFSET_BEGINNING:从有数据的位置开始订阅。

  • OFFSET_END:从末尾开始订阅。

  • 时间格式,如:"2021-05-22 11:00:00"

如果没有指定,则默认从OFFSET_END开始订阅Topic下的所有Partition。

示例如下。

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"
重要

时间格式不能和OFFSET格式混用。

property

指定自定义Kafka参数。功能等同于Kafka shell中"--property"参数。

当参数的value为一个文件时,需要在value前加上关键词:"FILE:"。

Property参数说明

  • 使用SSL连接Kafka时,需要指定以下参数:

    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"

    其中property.security.protocolproperty.ssl.ca.location为必选项,用于指明连接方式为SSL,以及CA证书的位置。

    如果Kafka Server端开启了Client认证,则还需设置以下参数。

    "property.ssl.certificate.location"
    "property.ssl.key.location"
    "property.ssl.key.password"

    分别用于指定Clientpublic key、private key、private key的密码。

  • 指定Kafka Partition的默认起始offset。

    没有指定kafka_partitions/kafka_offsets,默认消费所有分区。此时可以通过kafka_default_offsets指定起始offset。默认为OFFSET_END,即从末尾开始订阅。

    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

更多支持的自定义参数,请参阅librdkafka的官方CONFIGURATION文档中,Client端的配置项。例如以下参数。

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

使用示例

创建Routine Load简单作业

  1. 创建待导入的SelectDB数据表,示例如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 分别设置不同的参数导入数据,示例如下。

    • example_dbtest_table创建一个名为test1Kafka Routine Load任务。指定列分隔符的group.idclient.id,设置自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅,示例如下。

      CREATE ROUTINE LOAD example_db.test1 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • example_dbtest_table创建一个名为test2Kafka Routine Load任务。导入任务为严格模式,示例如下。

      CREATE ROUTINE LOAD example_db.test2 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • 从指定的时间点开始消费,示例如下。

      CREATE ROUTINE LOAD example_db.test4 ON test_table
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "30",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200"
      ) FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offset" = "2024-01-21 10:00:00"
      );

导入JSON格式数据

Routine Load导入的JSON格式仅支持以下两种。

  • 只有一条记录,且为JSON对象。

    当使用单表导入(即通过ON TABLE_NAME指定表名)时,JSON数据格式如下。

    {"key1":"value1","key2":"value2","key3":"value3"}

    当使用动态或多表导入Routine Load(即不指定具体的表名)时,JSON数据格式如下。

    table_name|{"key1":"value1","key2":"value2","key3":"value3"}
  • JSON数组,数组中可含多条记录。

    当使用单表导入(即通过ON TABLE_NAME指定表名)时,JSON数据格式如下。

    [
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

    当使用动态或多表导入(即不指定具体的表名)时,JSON数据格式如下。

       table_name|[
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

导入JSON格式数据,示例如下。

  1. 创建待导入的SelectDB数据表,示例如下。

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
  2. 分别使用两个类型的JSON数据记录到Topic里:

    {
        "category":"value1331",
        "author":"value1233",
        "timestamp":1700346050,
        "price":1413
    }
    [
        {
            "category":"value13z2",
            "author":"vaelue13",
            "timestamp":1705645251,
            "price":14330
        },
        {
            "category":"lvalue211",
            "author":"lvalue122",
            "timestamp":1684448450,
            "price":24440
        }
    ]
  3. 以不同模式导入JSON数据,示例如下。

    • 以简单模式导入JSON数据。

      CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
      COLUMNS(category,price,author)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
       );
    • 精准导入JSON格式数据。

      CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
      COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json",
          "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
          "strip_outer_array" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
      );
      说明

      表里的分区字段dt在示例数据里并没有,而是在Routine Load语句里通过dt=from_unixtime(timestamp,'%Y%m%d')转换出来的。

访问不同验证方式的Kafka集群

根据Kafka集群验证方式的不同,访问的方式示例如下。

  1. 访问SSL认证的Kafka集群。

    访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(ca.pem)。如果Kafka集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过CREATE FILE命令上传到SelectDB中,并且Catalog名称为kafka。

    1. 上传文件,示例如下。

      CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
      CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
      CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. 创建Routine Load作业,示例如下。

      CREATE ROUTINE LOAD db1.job1 on tbl1
      PROPERTIES
      (
          "desired_concurrent_number"="1"
      )
      FROM KAFKA
      (
          "kafka_broker_list"= "broker1:9091,broker2:9091",
          "kafka_topic" = "my_topic",
          "property.security.protocol" = "ssl",
          "property.ssl.ca.location" = "FILE:ca.pem",
          "property.ssl.certificate.location" = "FILE:client.pem",
          "property.ssl.key.location" = "FILE:client.key",
          "property.ssl.key.password" = "abcdefg"
      );
      说明

      SelectDB通过KafkaC++ APIlibrdkafka来访问Kafka集群。librdkafka所支持的参数可以参阅Configuration properties

  2. 访问PLAIN认证的Kafka集群。

    访问开启PLAIN认证的Kafka集群,需要增加配置如下。

    1. property.security.protocol=SASL_PLAINTEXT:使用SASL plaintext。

    2. property.sasl.mechanism=PLAIN:设置SASL的认证方式为PLAIN。

    3. property.sasl.username=admin:设置SASL的用户名。

    4. property.sasl.password=admin:设置SASL的密码。

    创建Routine Load作业,示例如下。

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol"="SASL_PLAINTEXT",
        "property.sasl.mechanism"="PLAIN",
        "property.sasl.username"="admin",
        "property.sasl.password"="admin"
    );
    
  3. 访问Kerberos认证的Kafka集群。

    访问开启kerberos认证的Kafka集群,需要增加配置如下。

    1. security.protocol=SASL_PLAINTEXT:使用SASL plaintext。

    2. sasl.kerberos.service.name=$SERVICENAME:设置broker servicename。

    3. sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab:设置Keytab本地文件路径。

    4. sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}:设置SelectDB连接Kafka时使用的Kerberos主体。

    创建Routine Load作业,示例如下。

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol" = "SASL_PLAINTEXT",
        "property.sasl.kerberos.service.name" = "kafka",
        "property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
        "property.sasl.kerberos.principal" = "id@your.com"
    );
    说明
    • 若要使SelectDB访问开启kerberos认证方式的Kafka集群,需要在SelectDB集群所有运行节点上部署Kerberos客户端kinit,并配置krb5.conf,填写KDC服务信息等。

    • 配置property.sasl.kerberos.keytab的值需要指定Keytab本地文件的绝对路径,并允许SelectDB进程访问该本地文件。

修改导入作业

修改已经创建的例行导入作业。只能修改处于PAUSED状态的作业。

语法

ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]

参数说明

参数名称

参数说明

[db.]job_name

指定要修改的作业名称。

tbl_name

指定需要导入的表的名称。

job_properties

指定需要修改的作业参数。目前仅支持修改的参数如下。

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source

指定数据源的类型。当前支持:KAFKA

data_source_properties

指定数据源的相关属性。目前仅支持的属性如下。

  1. kafka_partitions

  2. kafka_offsets

  3. kafka_broker_list

  4. kafka_topic

  5. 自定义property,如property.group.id

说明

kafka_partitionskafka_offsets用于修改待消费的kafka partition 的offset,仅能修改当前已经消费的partition。不能新增partition。

使用示例

  • desired_concurrent_number修改为1,示例如下。

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "1"
    );
  • desired_concurrent_number修改为10,修改partitionoffset,修改group id,示例如下。

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "10"
    )
    FROM kafka
    (
        "kafka_partitions" = "0, 1, 2",
        "kafka_offsets" = "100, 200, 100",
        "property.group.id" = "new_group"
    );

暂停导入作业

暂停一个Routine Load作业。被暂停的作业可以通过RESUME命令重新运行。

语法

PAUSE [ALL] ROUTINE LOAD FOR <job_name>;

参数说明

参数名称

参数说明

[db.]job_name

指定要暂停的作业名称。

使用示例

  • 暂停名称为test1Routine Load作业,示例如下。

    PAUSE ROUTINE LOAD FOR test1;
  • 暂停所有Routine Load作业,示例如下。

    PAUSE ALL ROUTINE LOAD;

恢复导入作业

恢复一个被暂停的Routine Load作业。恢复的作业将继续从之前已消费的offset继续消费。

语法

RESUME [ALL] ROUTINE LOAD FOR <job_name>

参数说明

参数名称

参数说明

[db.]job_name

指定要恢复的作业名称。

使用示例

  • 恢复名称为test1Routine Load作业,示例如下。

    RESUME ROUTINE LOAD FOR test1;
  • 恢复所有Routine Load作业,示例如下。

    RESUME ALL ROUTINE LOAD;

停止导入作业

停止一个Routine Load作业。被停止的作业无法再重新运行。停止导入后,已导入数据不会回滚。

语法

STOP ROUTINE LOAD FOR <job_name>;

参数说明

参数名称

参数说明

[db.]job_name

指定要停止的作业名称。

使用示例

停止名称为test1Routine Load作业,示例如下。

STOP ROUTINE LOAD FOR test1;

查看导入作业

Routine Load作业运行状态需要通过SHOW ROUTINE LOAD命令查看。

语法

SHOW [ALL] ROUTINE LOAD [FOR job_name];

参数说明

参数名称

参数说明

[db.]job_name

指定要查看的作业名称。

说明

如果导入数据格式不合法,详细的错误信息会记录在ErrorLogUrls中。注意其中包含多个链接,拷贝其中任意一个链接在浏览器中查看即可。

使用示例

  • 展示名称为test1的所有Routine Load作业(包括已停止或取消的作业)。结果为一行或多行。

    SHOW ALL ROUTINE LOAD FOR test1;
  • 展示名称为test1的当前正在运行的Routine Load作业。

    SHOW ROUTINE LOAD FOR test1;
  • 显示example_db下,所有的Routine Load作业(包括已停止或取消的作业)。结果为一行或多行。

    use example_db;
    SHOW ALL ROUTINE LOAD;
  • 显示example_db下,所有正在运行的Routine Load作业。

    use example_db;
    SHOW ROUTINE LOAD;
  • 显示example_db下,名称为test1的当前正在运行的Routine Load作业。

    SHOW ROUTINE LOAD FOR example_db.test1;
  • 显示example_db下,名称为test1的所有Routine Load作业(包括已停止或取消的作业)。结果为一行或多行。

    SHOW ALL ROUTINE LOAD FOR example_db.test1;

相关系统配置

相关系统配置参数会影响Routine Load的使用。

  • max_routine_load_task_concurrent_num

    FE配置项,默认为5,可以运行时修改。该参数限制了一个例行导入作业最大的子任务并发数。建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。

  • max_routine_load_task_num_per_be

    FE配置项,默认为5,可以运行时修改。该参数限制了每个BE节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。

  • max_routine_load_job_num

    FE配置项,默认为100,可以运行时修改。该参数限制Routine Load作业的总数,包括NEED_SCHEDULED,RUNNING,PAUSE这些状态。超过后,不能再提交新的作业。

  • max_consumer_num_per_group

    BE 配置项,默认为3。该参数表示一个子任务中最多生成几个Consumer进行数据消费。对于Kafka数据源,一个Consumer可能消费一个或多个Kafka Partition。假设一个任务需要消费6Kafka Partition,则会生成3Consumer,每个Consumer消费2Partition。如果只有2Partition,则只会生成2Consumer,每个Consumer消费1Partition。

  • max_tolerable_backend_down_num

    FE配置项,默认值是0。在满足某些条件下,SelectDB可令PAUSED的任务重新调度,变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。

  • period_of_auto_resume_min

    FE配置项,默认是5分钟。该项意味着当SelectDB重新调度任务时,只会在5分钟这个周期内最多尝试3次。如果3次都失败则锁定当前任务,后续不再进行调度,但可通过人为干预,进行手动恢复。

其他说明

  • Routine Load作业和ALTER TABLE操作的关系。

    • Routine Load不会阻塞Schema变更和ROLLUP操作。但Schema变更完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在Routine Load作业中显式指定列映射关系,并且通过增加Nullable列或带Default值的列来减少这些问题。

    • 删除表的Partition可能会导致导入数据无法找到对应的Partition,导致作业进入暂停状态。

  • Routine Load作业和其他导入作业的关系(LOAD,DELETE,INSERT)。

    • Routine Load和其他LOAD作业以及INSERT操作没有冲突。

    • 当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务。因此在执行DELETE操作前,需要暂停Routine Load作业,并等待已下发的任务全部完成后,方可以执行DELETE。

  • Routine Load作业和DROP DATABASEDROP TABLE操作的关系。

    Routine Load对应的databasetable被删除后,作业会自动CANCEL。

  • Kafka类型的Routine Load作业和Kafka topic的关系。

    当创建例行导入声明的kafka_topicKafka集群中不存在时:

    • Kafka集群的Broker设置了auto.create.topics.enable=true,则topic会先被自动创建,自动创建的Partition个数是由您的Kafka集群中的Broker配置num.partitions决定的。例行作业会不断读取该topic的数据。

    • Kafka集群的Broker设置了auto.create.topics.enable=false,则topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态置为PAUSED。

    所以,如果您希望当Kafka topic不存在的时候,被例行作业自动创建的话,只需要将Kafka集群中的Broker设置auto.create.topics.enable=true即可。

  • 当环境中存在网段和域名解析的隔离措施,因此需要注意以下问题。

    • 创建Routine Load任务中指定的Broker list必须能够被SelectDB服务访问。

    • Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必须能够被SelectDB服务访问。

  • 指定消费的PartitionOffset。

    SelectDB支持指定Partition,Offset和时间点进行消费。这里说明下对应参数的配置关系。

    三个相关参数如下。

    • kafka_partitions:指定待消费的Partition列表,如:"0,1,2,3"。

    • kafka_offsets:指定每个分区的起始offset,必须和kafka_partitions列表个数对应。如:"1000,1000,2000,2000"

    • property.kafka_default_offset:指定分区默认的起始Offset。

    在创建导入作业时,这三个参数可以有以下五种组合方式。

    方式

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    行为

    1

    No

    No

    No

    系统会自动查找Topic对应的所有分区并从OFFSET_END开始消费。

    2

    No

    No

    Yes

    系统会自动查找Topic对应的所有分区并从default offset指定的位置开始消费。

    3

    Yes

    No

    No

    系统会从指定分区的OFFSET_END开始消费。

    4

    Yes

    Yes

    No

    系统会从指定分区的指定Offset处开始消费。

    5

    Yes

    No

    Yes

    系统会从指定分区,default Offset指定的位置开始消费。

  • STOPPAUSE的区别。

    FE会自动定期清理STOP状态的Routine Load,而PAUSE状态的则可以再次被恢复启用。