本文介绍如何通过Stream Load导入数据至云数据库 SelectDB 版实例中。
背景信息
Stream Load是属于同步接口的导入方式,用户通过发送HTTP请求将本地文件或数据流导入到云数据库 SelectDB 版实例中。Stream load执行并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。
Stream Load主要适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV(文本)、JSON、PARQUET和ORC。
创建导入
Stream Load通过HTTP协议提交和传输数据,这里通过curl
命令展示如何提交导入。用户也可以通过其他HTTP Client进行操作。
语法
# Header中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://host:port/api/{db}/{table}/_stream_load
参数说明
参数名称 | 参数说明 |
--location-trusted | 当需要认证时,会将user和password传递给被重定向到的服务器。 |
-u | 云数据库 SelectDB 版实例的用户名和密码。 |
-T | 需要导入的数据文件。 |
-XPUT | HTTP请求的Method,采用PUT请求方法。其中host为云数据库 SelectDB 版实例的VPC地址或公网地址;port:云数据库 SelectDB 版实例的端口号。 说明
|
由于Stream Load使用HTTP协议,所以导入任务有关的参数主要设置在Header中。常用的导入参数如下。
参数名称 | 参数说明 |
label | 导入任务的唯一标识。label是用户在导入命令中自定义的名称。通过这个label,用户可以查看对应导入任务的执行情况。 label也可用于防止用户重复导入相同的数据,强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。当label对应的导入作业状态为CANCELLED时,该label可以再次被使用。 |
format | 指定导入数据格式,支持CSV、JSON、PARQUET、ORC,默认值为CSV。 支持csv_with_names(CSV文件行首过滤)、csv_with_names_and_types(CSV文件前两行过滤)。 |
line_delimiter | 用于指定导入文件中的换行符,默认为\n。 您可以使用做多个字符的组合作为换行符。 |
column_separator | 用于指定导入文件中的列分隔符,默认为\t。 您可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符\x01,需要指定为-H"column_separator:\x01"。 |
compress_type | 指定文件的压缩格式。 目前只支持CSV文件的压缩,支持gz、lzo、bz2、lz4、lzop、deflate压缩格式。 |
max_filter_ratio | 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。 当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于0,来保证导入可以成功。 |
strict_mode | 是否开启严格过滤模式,默认为false。 开启后,会对导入过程中的列类型转换进行严格过滤,错误的数据将被filter。 |
cloud_cluster | 用于指定导入使用的集群。如果不指定,则使用用户的默认集群,如果用户没有设置默认集群,则自动为用户选择一个有权限的集群。 |
load_to_single_tablet | 是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许在对带有random分区的Duplicate表导入数据的时候设置。 |
where | 导入任务指定的过滤条件。 支持对原始数据指定where语句进行过滤,被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。 |
partitions | 待导入数据的Partition信息。如果待导入数据不属于指定的Partition则不会被导入。这些数据将计入dpp.abnorm.ALL。 |
columns | 待导入数据的函数变换配置。 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 |
merge_type | 数据合并类型,默认为APPEND,表示本次导入是普通的追加写操作。MERGE和DELETE类型仅适用于Unique Key表模型。其中MERGE类型需要配合delete参数使用,以标注Delete Flag列。而DELETE类型则表示本次导入的所有数据皆为删除数据。 |
delete | 仅在MERGE下有意义,表示数据的删除条件function_column.sequence_col:只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 |
exec_mem_limit | 导入内存限制。默认为2 GB,单位为字节。 |
timeout | 指定导入的超时时间,单位:秒,默认是600秒。可设置范围为1~259200秒。 |
timezone | 指定本次导入所使用的时区,默认为东八区。 该参数会影响所有导入涉及的和时区有关的函数结果。 |
two_phase_commit | 是否开启两阶段事务提交模式,默认为false。 开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。 |
由于Stream load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。返回结果样例如下。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}
返回结果参数说明如下。
参数名称 | 参数说明 |
TxnId | 导入的事务ID,用户可不感知。 |
Label | 导入Label,由用户指定或系统自动生成。 |
Status | 导入状态,取值如下:
|
ExistingJobStatus | 已存在的Label对应的导入作业的状态。 这个字段只有在当Status为"Label Already Exists"时才会显示。用户可以通过这个状态,知晓已存在Label对应的导入作业的状态。"RUNNING"表示作业还在执行,"FINISHED"表示作业成功。 |
Message | 错误信息提示。 |
NumberTotalRows | 导入总处理的行数。 |
NumberLoadedRows | 成功导入的行数。 |
NumberFilteredRows | 数据质量不合格的行数。 |
NumberUnselectedRows | 被where条件过滤的行数。 |
LoadBytes | 导入的字节数。 |
LoadTimeMs | 导入完成时间,单位毫秒。 |
BeginTxnTimeMs | 向Fe请求开始一个事务所花费的时间,单位:毫秒。 |
StreamLoadPutTimeMs | 向Fe请求获取导入数据执行计划所花费的时间,单位:毫秒。 |
ReadDataTimeMs | 读取数据所花费的时间,单位:毫秒。 |
WriteDataTimeMs | 执行写入数据操作所花费的时间,单位:毫秒。 |
CommitAndPublishTimeMs | 向Fe请求提交并且发布事务所花费的时间,单位:毫秒。 |
ErrorURL | 如果有数据质量问题,通过访问这个URL查看具体错误行。 |
基础的使用样例如下:
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
取消导入
用户无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。
查看Stream Load
您可以通过show stream load来查看已经完成的Stream load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项。
使用示例
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,使用Label用于去重,指定超时时间为100秒。curl --location-trusted -u root -H "label:123" -H "timeout:100" -T test.data http://host:port/api/test_db/test_table/_stream_load
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,使用label用于去重,并且只导入k1等于20180601的数据。curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T test.data http://host:port/api/test_db/test_table/_stream_load
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,允许20%的错误率(用户是defalut_cluster中的)。curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T test.data http://host:port/api/test_db/test_table/_stream_load
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,允许20%的错误率,并且指定文件的列名。curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T test.data http://host:port/api/test_db/test_table/_stream_load
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表中的p1、p2分区,允许20%的错误率。curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T test.data http://host:port/api/test_db/test_table/_stream_load
使用流式方式(streaming)导入。
seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/test_db/test_table/_stream_load
导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列,也可使用hll_empty补充数据中没有的列。
curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T test.data http://host:port/api/test_db/test_table/_stream_load
导入数据进行严格模式过滤,并设置时区为Africa/Abidjan。
curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T test.data http://host:port/api/test_db/test_table/_stream_load
删除与这批导入key相同的数据。
curl --location-trusted -u root -H "merge_type: DELETE" -T test.data http://host:port/api/test_db/test_table/_stream_load
将这批数据中与flag列为1的数据相匹配的列删除,其他行正常追加。
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T test.data http://host:port/api/test_db/test_table/_stream_load
导入数据到含有sequence列的UNIQUE_KEYS表中。
curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T test.data http://host:port/api/test_db/test_table/_stream_load
相关系统配置
FE配置
stream_load_default_timeout_second
:导入任务的超时时间,单位:秒。默认的timeout时间为600秒,导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在Stream load请求中设置单独的超时时间或者调整FE的参数stream_load_default_timeout_second
来设置全局的默认超时时间。
BE配置
streaming_load_max_mb
:Stream load的最大导入大小,单位:MB,默认值为10240 MB。如果您的原始文件超过该值,则需要调整BE参数streaming_load_max_mb
。
- 本页导读 (1)