Stream Load

当您需要将本地文件或数据流导入到StarRocks时,可以使用Stream Load进行数据导入。本文介绍如何通过Stream Load导入数据至StarRocks。

背景信息

Stream Load是一种同步的导入方式,您可以通过发送HTTP请求将本地文件或数据流导入到StarRocks中。该过程同步执行并返回导入结果,您可以通过请求的返回值直接判断导入是否成功。Stream Load支持CSVJSON文件格式,且单次导入的数据量限制在10 GB以下。

创建导入任务

Stream Load通过HTTP协议提交和传输数据。本文通过curl命令展示如何提交导入任务。您也可以通过其他HTTP Client进行操作。

语法

curl --location-trusted -u <username>:<password> -XPUT <url>
(
    data_desc
)
[opt_properties]        
说明
  • 建议在HTTP请求的请求头字段Expect中指定100-continue,即"Expect:100-continue"。这样在服务器拒绝导入任务请求的情况下,可以避免不必要的数据传输,从而减少不必要的资源开销。

  • StarRocks中,部分文字属于SQL语言的保留关键字,因而无法直接用于SQL语句。如果希望在SQL语句中引用这些保留关键字,必须使用反引号 (`) 将其包裹起来。更多关键字信息,请参见关键字

参数说明

  • <username>:<password>:指定StarRocks集群的用户名和密码。必选参数。如果账号没有设置密码,这里只需要传入<username>:

  • XPUT:用于指定HTTP请求方法。必选参数。Stream Load当前只支持PUT方法。

  • <url>:用于指定StarRocks表的URL地址。必选参数。填写格式为:http://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load

    涉及参数如下表所示。

    参数

    是否必须

    说明

    <fe_host>

    指定StarRocks集群中FEIP地址。

    <fe_http_port>

    指定StarRocks集群中FEHTTP端口号。默认为18030。

    您可以在StarRocks集群服务页面的配置页签,通过搜索http_port参数,查看端口号。此外,您也可以通过SHOW FRONTENDS命令查看FE节点的IP地址和HTTP端口号。

    <database_name>

    指定目标StarRocks表所在的数据库的名称。

    <table_name>

    指定目标StarRocks表的名称。

  • desc:用于描述源数据文件的各项属性,包括文件名称、格式、列分隔符、行分隔符、目标分区,以及与StarRocks表之间的列对应关系等。填写格式为如下所示。

    -T <file_path>
    -H "format: CSV | JSON"
    -H "column_separator: <column_separator>"
    -H "row_delimiter: <row_delimiter>"
    -H "columns: <column1_name>[, <column2_name>, ... ]"
    -H "partitions: <partition1_name>[, <partition2_name>, ...]"
    -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]"
    -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]"
    -H "strip_outer_array: true | false"
    -H "json_root: <json_path>"
    -H "ignore_json_size: true | false"
    -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"

    data_desc中的参数可以分为三类:公共参数、CSV适用的参数、以及JSON适用的参数。

    • 公共参数

      参数

      是否必选

      说明

      <file_path>

      指定源数据文件的保存路径。文件名里可选包含或者不包含扩展名。

      format

      指定待导入数据的格式。取值包括CSVJSON。默认值:CSV

      partitions

      指定需将数据导入的具体分区。如果未指定该参数,则默认将数据导入StarRocks表所在的所有分区。

      temporary_partitions

      指定要导入数据的临时分区。

      columns

      指定源数据文件与StarRocks表之间的列对应关系。

      • 如果源数据文件中的列与StarRocks表中的列按顺序一一对应,则不需要指定该参数。

      • 如果源数据文件与表Schema不对应,则需要该参数来配置数据转换规则。列有两种形式,一种是直接对应于导入文件中的字段,可以直接使用字段名表示,一种需要通过计算得出。

        • 示例1:表中有3c1, c2, c3,源文件中的3列依次对应的是c3,c2,c1,则需要指定-H "columns: c3, c2, c1"

        • 示例2:表中有3c1, c2, c3,源文件中前3列与表中的列一一对应,但是还有多余1列,则需要指定-H "columns: c1, c2, c3, temp",最后1列随意指定名称用于占位即可。

        • 示例3:表中有3year, month, day,源文件中只有一个时间列,为2018-06-01 01:02:03格式,则可以指定 -H "columns: col, year = year(col), month=month(col), day=day(col)"完成导入。

    • CSV适用参数

      参数

      是否必选

      说明

      column_separator

      用于指定源数据文件中的列分隔符,默认为\t

      如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"

      row_delimiter

      用于指定源数据文件中的行分隔符,默认为\n

      重要

      curl命令无法传递\n,换行符手动指定为\n时,shell会先传递反斜线(\),然后传递n而不是直接传递换行符\n。

      Bash支持另一种转义字符串语法,传递\n\t时,使用美元符号和全角单引号($')启动字符串并以半角单引号(')结束字符串。例如,-H $'row_delimiter:\n'

      skip_header

      用于指定跳过CSV文件开头的若干行数据。取值类型为整数(INTEGER),默认值为0。

      在某些CSV文件中,最开头的几行数据用于定义列名、列类型等元数据信息。通过设置该参数,可以使StarRocks在导入数据时忽略CSV文件的前几行。例如,如果将该参数设置为1,则StarRocks在导入数据时将忽略CSV文件的第一行。

      这里的行所使用的分隔符须与您在导入命令中所设定的行分隔符一致。

      where

      用于抽取部分数据。用户如需将不需要的数据过滤掉,那么可以通过设定这个选项来达到。

      例如,只导入k1列等于20180601的数据,则可以在导入时指定-H "where: k1 = 20180601"

      max_filter_ratio

      最大容忍可过滤(例如,因为数据不规范等原因而过滤)的数据比例。默认零容忍。

      说明

      此处数据不规范的数据不包括通过WHERE条件过滤的数据。

      partitions

      用于指定该导入所涉及的Partition。

      如果您能够确定数据对应的Partition,则推荐指定该项。不满足指定分区的数据将被过滤掉。例如,指定导入到p1p2分区,可以指定-H "partitions: p1, p2"

      timeout

      指定导入的超时时间。默认是600秒。

      设置范围为1~259200,单位为秒。

      strict_mode

      指定此次导入是否开启严格模式。取值如下:

      • false(默认值):不开启。

      • true:开启。开启后,会对导入过程中的列类型转换进行严格过滤。

      timezone

      指定本次导入所使用的时区。默认为东八区。

      该参数会影响所有导入涉及和时区有关的函数结果。

      exec_mem_limit

      导入内存限制。默认值为2 GB。

    • JSON适用参数

      参数

      是否必选

      说明

      jsonpaths

      用于指定待导入的字段的名称。仅在使用匹配模式导入 JSON 数据时需要指定该参数。参数取值为 JSON 格式。

      strip_outer_array

      用于指定是否裁剪最外层的数组结构。取值如下:

      • false(默认值):表示会保留JSON数据的原始结构,不剥离外层数组,效果是将整个JSON数组作为单一值导入。

        例如,示例数据[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在设置strip_outer_arrayfalse后,会解析为一个数组数据导入表中。

      • true:当导入数据格式为JSON数组时,需要设置strip_outer_array为 true。

        例如,示例数据[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在设置strip_outer_arraytrue后,会解析为两条数据导入表中。

      json_root

      用于指定待导入 JSON 数据的根元素。仅在使用匹配模式导入JSON数据时需要指定该参数。参数取值为合法的JsonPath字符串。默认值为空,表示会导入整个 JSON 数据文件的数据。

      ignore_json_size

      用于指定是否检查 HTTP 请求中 JSON Body 的大小。

      说明

      HTTP请求中JSON Body的大小默认不能超过100 MB。如果JSON Body的大小超过100 MB,会提示 "The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead huge memory consuming." 错误。为避免该报错,可以在HTTP请求头中添加 "ignore_json_size:true" 设置,忽略对JSON Body大小的检查。

      compression, Content-Encoding

      指定在STREAM LOAD数据传输过程中使用哪种压缩算法,支持GZIP、BZIP2、LZ4_FRAME、ZSTD算法。

      例如,curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4

  • opt_properties:用于指定一些导入相关的可选参数。指定的参数设置作用于整个导入任务。

    填写格式为如下所示。

    -H "label: <label_name>"
    -H "where: <condition1>[, <condition2>, ...]"
    -H "max_filter_ratio: <num>"
    -H "timeout: <num>"
    -H "strict_mode: true | false"
    -H "timezone: <string>"
    -H "load_mem_limit: <num>"
    -H "partial_update: true | false"
    -H "partial_update_mode: row | column"
    -H "merge_condition: <column_name>"

    参数说明如下表所示。

    参数

    是否必选

    说明

    label

    导入任务的标签,相同标签的数据无法多次导入。

    您可以通过指定Label的方式来避免一份数据重复导入的问题。当前StarRocks系统会保留最近30分钟内成功完成的任务的Label。

    where

    用于指定过滤条件。如果指定了该参数,StarRocks将根据所设定的过滤条件对转换后的数据进行筛选。只有满足where子句中所定义的过滤条件的数据才会被导入。

    例如,只导入k1列等于20180601的数据,则可以在导入时指定-H "where: k1 = 20180601"

    max_filter_ratio

    最大容忍可过滤(例如,因为数据不规范等原因而过滤)的数据比例。默认零容忍。

    说明

    此处数据不规范的数据不包括通过where条件过滤的数据。

    log_rejected_record_num

    指定最多允许记录因数据质量不合格而被过滤的数据行数。该参数自3.1版本起支持。取值范围:0-1、大于 0 的正整数。默认值:0

    • 取值为0表示不记录过滤掉的数据行。

    • 取值为-1表示记录所有过滤掉的数据行。

    • 取值为大于0的正整数(比如 n)表示每个 BE(或 CN)节点上最多可以记录 n 条过滤掉的数据行。

    timeout

    指定导入的超时时间。默认是600秒。

    设置范围为1~259200,单位为秒。

    strict_mode

    指定此次导入是否开启严格模式。

    • false(默认值):不开启。

    • true:开启。

    timezone

    指定本次导入所使用的时区。默认为东八区。

    该参数会影响所有导入涉及和时区有关的函数结果。

    load_mem_limit

    导入任务的内存限制。默认值为2 GB。

    partial_update

    是否使用部分列更新。取值包括TRUEFALSE。默认值:FALSE

    partial_update_mode

    指定部分更新的模式,取值包括rowcolumn

    • row(默认值),指定使用行模式执行部分更新,比较适用于较多列且小批量的实时更新场景。

    • column,指定使用列模式执行部分更新,比较适用于少数列并且大量行的批处理更新场景。在该场景,开启列模式,更新速度更快。

      例如,在一个包含100列的表中,每次更新10列(占比10%)并更新所有行,则开启列模式,更新性能将提高10倍。

    merge_condition

    用于指定作为更新生效条件的列名。这样只有当导入的数据中该列的值大于等于当前值的时候,更新才会生效。

    说明

    指定的列必须为非主键列,且仅主键表支持条件更新。

示例

CSV格式的文件data.csv导入至StarRocks集群的load_test库的example_table表中,完整示例请参见导入数据的完整示例

curl --location-trusted -u "root:" \
     -H "Expect:100-continue" \
     -H "label:label2" \
     -H "column_separator: ," \
     -T data.csv -XPUT \
     http://172.17.**.**:18030/api/load_test/example_table/_stream_load

返回值

导入完成后,将以JSON格式返回导入任务的结果信息。返回结果示例如下。

{
    "TxnId": 9,
    "Label": "label2",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 4,
    "NumberLoadedRows": 4,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 45,
    "LoadTimeMs": 235,
    "BeginTxnTimeMs": 101,
    "StreamLoadPlanTimeMs": 102,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 11,
    "CommitAndPublishTimeMs": 19
}

返回结果中的参数说明如下表所述。

参数

描述

TxnId

导入的事务ID。

Label

导入的标签。

Status

导入状态,取值如下:

  • Success:表示数据导入成功,数据已经可见。

  • Publish Timeout:表示导入已经成功提交,只是数据可能会延迟可见,无需重试导入。

  • Label Already Exists:表示Label重复,需更换Label

  • Fail:表示数据导入失败。

ExistingJobStatus

已存在Label对应的导入任务的状态。该字段只有当StatusLabel Already Exists时才会显示。您可以通过该状态,知晓已存在Label对应的导入任务的状态。

  • RUNNING:表示任务在执行中。

  • FINISHED:表示任务成功。

Message

导入任务的状态详情。导入失败时会返回具体的失败原因。

NumberTotalRows

从数据流中读取到的总行数。

NumberLoadedRows

导入任务的数据行数,仅在导入状态为Success时有效。

NumberFilteredRows

导入任务过滤掉的行数,即数据质量不合格的行。

NumberUnselectedRows

通过Where条件被过滤掉的行数。

LoadBytes

导入任务的源文件数据量大小。

LoadTimeMs

导入任务所用的时间,单位为ms。

BeginTxnTimeMs

导入任务开启事务的时长。

StreamLoadPlanTimeMs

导入任务生成执行计划的时长。

ReadDataTimeMs

导入任务读取数据的时长。

WriteDataTimeMs

导入任务写入数据的时长。

ErrorURL

如果任务导入失败,会返回该参数。

通过ErrorURL可以查看导入过程中因数据质量不合格而过滤掉的错误数据行的具体信息。您可以在提交导入任务时,通过可选参数log_rejected_record_num来指定最多可以记录多少条错误数据行的信息。

您可以通过 curl "url" 命令直接查看错误数据行的信息,也可以通过执行 wget "url" 命令导出错误数据行的信息。

例如,导出错误数据行的信息。

wget "http://172.17.**.**:18040/api/_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad"

导出的错误数据行信息会保存到一个名为_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad的本地文件中。您可以通过cat _load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad命令查看该文件的内容。

您可以根据错误信息调整导入任务,然后重新提交导入任务。

取消导入任务

Stream Load无法手动取消,Stream Load在超时或者导入错误后会被系统自动取消。您可根据返回结果中的ErrorURL下载报错信息,进行错误排查。

导入数据的完整示例

本文示例通过curl命令导入任务。

  1. 创建待导入数据的表。

    1. 使用SSH方式登录StarRocks集群的Master节点,详情请参见登录集群

    2. 执行以下命令,通过MySQL客户端连接StarRocks集群。

      mysql -h127.0.0.1  -P 9030 -uroot
    3. 执行以下命令,创建对应的库表。

      CREATE DATABASE IF NOT EXISTS load_test;
      USE load_test;
      
      CREATE TABLE IF NOT EXISTS example_table (
          id INT,
          name VARCHAR(50),
          age INT
      )
      DUPLICATE KEY(id)
      DISTRIBUTED BY HASH(id) BUCKETS 3
      PROPERTIES (
          "replication_num" = "1"  -- 副本数设为 1
      );
      

      执行完后按Ctrl+D退出MySQL客户端。

  2. 准备测试数据。

    准备CSV数据

    例如,创建待导入文件data.csv,内容如下所示。

    id,name,age
    1,Alice,25
    2,Bob,30
    3,Charlie,35

    准备JSON数据

    例如,创建待导入文件json.data,内容如下所示。

    {"id":1,"name":"Emily","age":25}
    {"id":2,"name":"Benjamin","age":35}
    {"id":3,"name":"Olivia","age":28}
    {"id":4,"name":"Alexander","age":60}
    {"id":5,"name":"Ava","age":17}
  3. 执行以下命令,创建导入任务。

    导入CSV数据

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label1" \
         -H "column_separator: ," \
         -T data.csv -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

    导入JSON数据

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label2" \
         -H "format:json" \
         -T json.data -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

代码集成示例