Broker Load 是云数据库 SelectDB 版用于异步批量导入数据的接口,支持从 HDFS、S3、OSS 等分布式存储系统高效导入大规模数据(单次支持百GB级)。本文介绍如何通过Broker Load导入数据至云数据库 SelectDB 版实例。
功能优势
Broker Load具有以下优势。
- 大容量支持:单次导入数据量可达百GB级别,适合大规模迁移离线数据。 
- 异步高并发:非阻塞异步写入,提高了集群资源的利用率。 
- 兼容性强:支持读取HDFS、S3等远端存储数据。 
- 操作便捷: - 可通过标准的MySQL协议创建导入任务,进行数据导入。 
- 可通过 - SHOW LOAD命令实时监控导入进度与结果。
 
适用场景
适用于从OSS、HDFS、S3等分布式存储系统高效导入大规模数据。
- 小批量导入(如100MB)的数据时效性为10秒级。 
- 大批量导入(如100GB)的时效性为10分钟级。 
创建导入
该方式用于通过Broker导入,读取远端存储(如HDFS、S3)上的数据导入到云数据库 SelectDB 版的表中。
语法
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];参数说明
| 参数名称 | 参数说明 | 
| 
 | 导入任务的唯一标识。Label是在导入命令中自定义的名称。通过Label,可以查看对应导入任务的执行情况。Label也可用于防止重复导入相同的数据,当Label对应的导入作业状态为CANCELLED时,该Label可以再次被使用。 格式: 说明  推荐同一批次数据使用相同的Label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。 | 
| 
 | 描述一组需要导入的文件。详细参数说明,请参见data_desc1参数说明。 | 
| 
 | 指定需要使用的Broker类型,支持HDFS、S3两种。S3类型的Broker Load也称为OSS Load,详情请参见OSS Load。 | 
| 
 | 指定Broker所需的参数让Broker能够访问远端存储系统。例如:BOS或HDFS。 语法如下:  | 
| 
 | 指定导入的相关参数。详细参数说明,请参见load_properties参数说明。 | 
data_desc1参数说明
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]| 参数名称 | 参数说明 | 
| 
 | 指定数据合并类型。默认为APPEND。默认值表示本次导入是普通的追加写操作。MERGE和DELETE类型仅适用于Unique Key模型表。其中MERGE类型需要配合[DELETE ON]语句使用,以标注Delete Flag列。而DELETE类型则表示本次导入的所有数据皆为删除数据。 | 
| 
 | 指定需要导入的文件路径。需要导入的文件路径可以是多个,可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入失败。 | 
| 
 | 表示本次导入为一批“负”导入。这种方式仅针对具有整型SUM聚合类型的聚合数据表。该方式会将导入数据中SUM聚合列对应的整型数值取反。用于冲抵之前导入错误的数据。 | 
| 
 | 指定仅导入表的某些分区,不在分区范围内的数据将被忽略。 | 
| 
 | 指定列分隔符,仅在CSV格式下有效,仅能指定单字节分隔符。 | 
| 
 | 指定文件类型,默认为CSV。支持CSV、PARQUET和ORC格式。 | 
| 
 | 指定原始文件中的列顺序。 | 
| 
 | 指定从导入的文件中抽取的列。 | 
| 
 | 指定前置过滤条件。数据首先根据 | 
| 
 | 指定列的转换函数。 | 
| 
 | 指定数据的过滤条件。 | 
| 
 | 需配合MERGE导入模式一起使用,仅针对Unique Key模型的表。用于指定导入数据中表示Delete Flag的列和计算关系。 | 
| 
 | 仅针对使用Unique Key模型的表。用于指定导入数据中表示Sequence Col的列。主要用于导入时保证数据顺序。 | 
| 
 | 指定导入的format的一些参数。如导入的文件是JSON格式,则可以在这里指定json_root、jsonpaths、fuzzy_parse等参数。 | 
load_properties参数说明
| 参数名称 | 参数说明 | 
| 
 | 导入超时时间,单位为秒,默认为14400,即4小时。 | 
| 
 | 最大容忍可过滤比率(数据不规范等原因)。默认0,即零容忍。取值范围为0~1。 | 
| 
 | 导入内存限制,单位为字节,默认为2147483648,即2 GB。 | 
| 
 | 设置导入任务是否开启严格模式,默认为false。 | 
| 
 | 指定某些受时区影响的函数的时区,默认为 | 
| 
 | 导入并发度,默认为1。调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。 | 
| 
 | 用于设置发送批处理数据的并行度,如果并行度的值超过计算集群BE配置中的max_send_batch_parallelism_per_job,那么计算集群将使用max_send_batch_parallelism_per_job的值。 | 
| 
 | 是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许在对带有random分桶的Duplicate表导入数据的时候设置。 | 
使用示例
- 创建待导入的SelectDB数据表,示例如下。 - CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50) ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1"); CREATE TABLE test_table2 ( id int, name varchar(50), age int, address varchar(50) ) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");
- 创建待导入的文件。 - 文件 - file1.txt,文件内容如下。- 1,tomori,32,shanghai 2,anon,22,beijing 3,taki,23,shenzhen 4,rana,45,hangzhou 5,soyo,14,shanghai 6,saki,25,hangzhou 7,mutsumi,45,shanghai 8,uika,26,shanghai 9,umiri,27,shenzhen 10,nyamu,37,shanghai
- 文件 - file2.csv,文件内容如下。- 1,saki,25,hangzhou 2,mutsumi,45,shanghai 3,uika,26,shanghai 4,umiri,27,shenzhen 5,nyamu,37,shanghai
 
- 将文件数据导入到表中。 - 从HDFS导入文本文件 - file1.txt,示例如下。- LOAD LABEL example_db.label1 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );- 导入文件file1.txt,按逗号分隔,导入到表test_table。当从HDFS导入数据时,broker_properties中需要指定 - fs.defaultFS属性,以确保可以正确的连接到HDFS集群并找到相应的数据文件。
- 从HDFS导入数据,同时导入两个文件到两个表中,示例如下。 - LOAD LABEL test_db.test_02 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,temp_age,address) SET ( age = temp_age + 1 ), DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );- 导入两个文件 - file1.txt和- file2.csv,分别导入到- test_table和- test_table2两张表中,并且将- file2.csv中age的值加1后导入。
- 从HA模式部署的HDFS集群中,导入一批数据,示例如下。 - LOAD LABEL test_db.test_03 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*") INTO TABLE `test_table` COLUMNS TERMINATED BY "\\x01" ) WITH HDFS ( "hadoop.username" = "hive", "fs.defaultFS" = "hdfs://my_ha", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );- 指定分隔符为Hive的默认分隔符 - \\x01,并使用通配符- *指定- data目录下所有目录的所有文件。
- 对导入数据file1.txt进行过滤处理,符合条件的数据才可导入,示例如下。 - LOAD LABEL test_db.test_04 ( DATA INFILE("hdfs://host:port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," (id,name,age,address) WHERE age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );- 只有原始数据中 - age < 20的行才会被导入。
- 从HDFS导入一批数据file1.txt,指定超时时间和过滤比例,并且将原有数据中与导入数据中 - age<20的列相匹配的行删除,其他行正常导入,示例如下。- LOAD LABEL test_db.test_05 ( MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,age,address) DELETE ON age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );- 使用MERGE方式导入。 - test_table必须是一张Unique Key的表。当导入数据中的age列的值小于- 20时,该行会被认为是一个删除行。导入任务的超时时间是3600秒,并且允许错误率在10%以内。
 
取消导入
当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消导入。取消时需要指定待取消导入任务的Label。导入任务取消后,已写入的数据也会回滚,不会生效。
语法
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];参数说明
| 参数名称 | 参数说明 | 
| 
 | 数据库名称。不指定的时使用当前默认数据库。 | 
| 
 | 导入任务的Label名称,精确匹配。如果使用LABEL LIKE,则会匹配导入任务的Label包含label_pattern的导入任务。 | 
使用示例
- 撤销数据库 - example_db上,Label为- example_db_test_load_label的导入作业。- CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";
- 撤销数据库 - example_db上,所有包含- example_的导入作业。- CANCEL LOAD FROM example_db WHERE LABEL like "example_";
查看导入
Broker Load是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过SHOW LOAD命令查看。
语法
SHOW LOAD
[FROM db_name]
[
   WHERE
   [LABEL [ = "your_label" | LIKE "label_matcher"]]
   [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];参数说明
| 参数名称 | 参数说明 | 
| 
 | 数据库名称。不指定的场合使用当前默认数据库。 | 
| 
 | 导入任务的Label名称,精确匹配。如果使用LABEL LIKE,则会匹配导入任务的Label包含label_matcher的导入任务。 | 
| 
 | 导入状态。只查看指定状态的导入任务。 | 
| 
 | 指定排序依据。 | 
| 
 | 显示Limit条匹配记录。不指定的场合全部显示。 | 
| 
 | 从偏移量offset开始显示查询结果。默认情况下偏移量为0。 | 
使用示例
- 展示数据库 - example_db的导入任务,Label中包含字符串- 2014_01_02,展示保存时间最久的10个。- SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
- 展示数据库 - example_db的导入任务,指定Label为- load_example_db_20140102并按- LoadStartTime降序排序。- SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
- 展示数据库 - example_db的导入任务,指定Label为- load_example_db_20140102,state为- loading。- SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
- 展示数据库 - example_db的导入任务,按- LoadStartTime降序排序,并从偏移量5开始显示10条查询结果。- SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10; SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
最佳实践
- 查看导入任务状态 - Broker Load是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 - SHOW LOAD命令查看。
- 取消导入任务 - 已提交切尚未结束的导入任务可以通过 - CANCEL LOAD命令取消。取消后,已写入的数据也会回滚,不会生效。
- Label、导入事务、多表原子性 - SelectDB中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,SelectDB还可以通过Label的机制来保证数据导入的不丢失和不重复。 
- 列映射、衍生列和过滤 - SelectDB可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和UDF。详情请参见数据转换文档。 
- 错误数据过滤 - SelectDB的导入任务可以容忍一部分格式错误的数据。容忍了通过 - max_filter_ratio设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果您希望忽略部分有问题的数据行,可以将次参数设置为0~1之间的数值,SelectDB会自动跳过哪些数据格式不正确的行。- 关于容忍率的一些计算方式,详情请参见数据转换文档。 
- 严格模式 - strict_mode属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。
- 超时时间 - Broker Load的默认超时时间为4小时,从任务提交开始算起。如果超时未完成,则任务会失败。 
- 数据量和任务数限制 - 建议通过Broker Load单次导入100 GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限,但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会较大。 - 同时受限于集群规模,我们限制了导入的最大数据量为节点数乘3 GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。 - SelectDB会限制集群内同时运行的导入任务数量,通常在3~10个之间,之后提交的导入作业会排队等待。队列最大长度为100,之后的提交会直接拒绝。 说明- 排队时间也会被计算到作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。