文档

Stream Load

更新时间:

本文介绍如何通过Stream Load导入数据至云数据库 SelectDB 版实例中。

背景信息

Stream Load是属于同步接口的导入方式,用户通过发送HTTP请求将本地文件或数据流导入到云数据库 SelectDB 版实例中。Stream load执行并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。

Stream Load主要适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV(文本)、JSON、PARQUET和ORC。

创建导入

Stream Load通过HTTP协议提交和传输数据,这里通过curl命令展示如何提交导入。用户也可以通过其他HTTP Client进行操作。

语法

# Header中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://host:port/api/{db}/{table}/_stream_load 

参数说明

参数名称

参数说明

--location-trusted

当需要认证时,会将user和password传递给被重定向到的服务器。

-u

云数据库 SelectDB 版实例的用户名和密码。

-T

需要导入的数据文件。

-XPUT

HTTP请求的Method,采用PUT请求方法。其中host为云数据库 SelectDB 版实例的VPC地址或公网地址;port:云数据库 SelectDB 版实例的端口号。

说明
  • 申请公网的具体操作,请参见申请和释放公网地址

  • 您可以在的实例详情页面查看云数据库 SelectDB 版实例的连接地址和端口号。

由于Stream Load使用HTTP协议,所以导入任务有关的参数主要设置在Header中。常用的导入参数如下。

参数名称

参数说明

label

导入任务的唯一标识。label是用户在导入命令中自定义的名称。通过这个label,用户可以查看对应导入任务的执行情况。

label也可用于防止用户重复导入相同的数据,强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。当label对应的导入作业状态为CANCELLED时,该label可以再次被使用。

format

指定导入数据格式,支持CSV、JSON、PARQUET、ORC,默认值为CSV。

支持csv_with_names(CSV文件行首过滤)、csv_with_names_and_types(CSV文件前两行过滤)。

line_delimiter

用于指定导入文件中的换行符,默认为\n。

您可以使用做多个字符的组合作为换行符。

column_separator

用于指定导入文件中的列分隔符,默认为\t。

您可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符\x01,需要指定为-H"column_separator:\x01"。

compress_type

指定文件的压缩格式。

目前只支持CSV文件的压缩,支持gz、lzo、bz2lz4、lzop、deflate压缩格式。

max_filter_ratio

导入任务的最大容忍率,默认为0容忍,取值范围是0~1。

当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于0,来保证导入可以成功。

strict_mode

是否开启严格过滤模式,默认为false。

开启后,会对导入过程中的列类型转换进行严格过滤,错误的数据将被filter。

cloud_cluster

用于指定导入使用的集群。如果不指定,则使用用户的默认集群,如果用户没有设置默认集群,则自动为用户选择一个有权限的集群。

load_to_single_tablet

是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许在对带有random分区的Duplicate表导入数据的时候设置。

where

导入任务指定的过滤条件。

支持对原始数据指定where语句进行过滤,被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。

partitions

待导入数据的Partition信息。如果待导入数据不属于指定的Partition则不会被导入。这些数据将计入dpp.abnorm.ALL。

columns

待导入数据的函数变换配置。

支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

merge_type

数据合并类型,默认为APPEND,表示本次导入是普通的追加写操作。MERGE和DELETE类型仅适用于Unique Key表模型。其中MERGE类型需要配合delete参数使用,以标注Delete Flag列。而DELETE类型则表示本次导入的所有数据皆为删除数据。

delete

仅在MERGE下有意义,表示数据的删除条件function_column.sequence_col:只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。

exec_mem_limit

导入内存限制。默认为2 GB,单位为字节。

timeout

指定导入的超时时间,单位:秒,默认是600秒。可设置范围为1~259200秒。

timezone

指定本次导入所使用的时区,默认为东八区。

该参数会影响所有导入涉及的和时区有关的函数结果。

two_phase_commit

是否开启两阶段事务提交模式,默认为false。

开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。

由于Stream load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。返回结果样例如下。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

返回结果参数说明如下。

参数名称

参数说明

TxnId

导入的事务ID,用户可不感知。

Label

导入Label,由用户指定或系统自动生成。

Status

导入状态,取值如下:

  • Success:导入成功。

  • Publish Timeout:导入已经完成,只是数据可能会延迟可见,无需重试。

  • Label Already Exists:Label重复,需更换Label。

  • Fail:导入失败。

ExistingJobStatus

已存在的Label对应的导入作业的状态。

这个字段只有在当Status为"Label Already Exists"时才会显示。用户可以通过这个状态,知晓已存在Label对应的导入作业的状态。"RUNNING"表示作业还在执行,"FINISHED"表示作业成功。

Message

错误信息提示。

NumberTotalRows

导入总处理的行数。

NumberLoadedRows

成功导入的行数。

NumberFilteredRows

数据质量不合格的行数。

NumberUnselectedRows

被where条件过滤的行数。

LoadBytes

导入的字节数。

LoadTimeMs

导入完成时间,单位毫秒。

BeginTxnTimeMs

向Fe请求开始一个事务所花费的时间,单位:毫秒。

StreamLoadPutTimeMs

向Fe请求获取导入数据执行计划所花费的时间,单位:毫秒。

ReadDataTimeMs

读取数据所花费的时间,单位:毫秒。

WriteDataTimeMs

执行写入数据操作所花费的时间,单位:毫秒。

CommitAndPublishTimeMs

向Fe请求提交并且发布事务所花费的时间,单位:毫秒。

ErrorURL

如果有数据质量问题,通过访问这个URL查看具体错误行。

基础的使用样例如下:

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

取消导入

用户无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。

查看Stream Load

您可以通过show stream load来查看已经完成的Stream load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项。​

使用示例

  • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,使用Label用于去重,指定超时时间为100秒。

     curl --location-trusted -u root -H "label:123" -H "timeout:100" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,使用label用于去重,并且只导入k1等于20180601的数据。

    curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,允许20%的错误率(用户是defalut_cluster中的)。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,允许20%的错误率,并且指定文件的列名。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件test.data中的数据导入到数据库test_db中的test_table表中的p1、p2分区,允许20%的错误率。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 使用流式方式(streaming)导入。

    seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/test_db/test_table/_stream_load
  • 导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列,也可使用hll_empty补充数据中没有的列。

    curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 导入数据进行严格模式过滤,并设置时区为Africa/Abidjan。

    curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 删除与这批导入key相同的数据。

    curl --location-trusted -u root -H "merge_type: DELETE" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将这批数据中与flag列为1的数据相匹配的列删除,其他行正常追加。

    curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 导入数据到含有sequence列的UNIQUE_KEYS表中。

    curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T test.data http://host:port/api/test_db/test_table/_stream_load

相关系统配置

FE配置

stream_load_default_timeout_second:导入任务的超时时间,单位:秒。默认的timeout时间为600秒,导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在Stream load请求中设置单独的超时时间或者调整FE的参数stream_load_default_timeout_second来设置全局的默认超时时间。

BE配置

streaming_load_max_mb:Stream load的最大导入大小,单位:MB,默认值为10240 MB。如果您的原始文件超过该值,则需要调整BE参数streaming_load_max_mb

  • 本页导读 (1)
文档反馈