当您需要将本地文件或数据流导入到StarRocks时,可以使用Stream Load进行数据导入。本文介绍如何通过Stream Load导入数据至StarRocks。
背景信息
Stream Load是一种同步的导入方式,您可以通过发送HTTP请求将本地文件或数据流导入到StarRocks中。该过程同步执行并返回导入结果,您可以通过请求的返回值直接判断导入是否成功。Stream Load支持CSV和JSON文件格式,且单次导入的数据量限制在10 GB以下。
创建导入任务
Stream Load通过HTTP协议提交和传输数据。本文通过curl
命令展示如何提交导入任务。您也可以通过其他HTTP Client进行操作。
语法
curl --location-trusted -u <username>:<password> -XPUT <url>
(
data_desc
)
[opt_properties]
建议在HTTP请求的请求头字段
Expect
中指定100-continue
,即"Expect:100-continue"
。这样在服务器拒绝导入任务请求的情况下,可以避免不必要的数据传输,从而减少不必要的资源开销。在StarRocks中,部分文字属于SQL语言的保留关键字,因而无法直接用于SQL语句。如果希望在SQL语句中引用这些保留关键字,必须使用反引号 (`) 将其包裹起来。更多关键字信息,请参见关键字。
参数说明
<username>:<password>
:指定StarRocks集群的用户名和密码。必选参数。如果账号没有设置密码,这里只需要传入<username>:
。XPUT
:用于指定HTTP请求方法。必选参数。Stream Load当前只支持PUT方法。<url>
:用于指定StarRocks表的URL地址。必选参数。填写格式为:http://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load
。涉及参数如下表所示。
参数
是否必须
说明
<fe_host>
是
指定StarRocks集群中FE的IP地址。
<fe_http_port>
是
指定StarRocks集群中FE的HTTP端口号。默认为18030。
您可以在StarRocks的集群服务页面的配置页签,通过搜索http_port参数,查看端口号。此外,您也可以通过
SHOW FRONTENDS
命令查看FE节点的IP地址和HTTP端口号。<database_name>
是
指定目标StarRocks表所在的数据库的名称。
<table_name>
是
指定目标StarRocks表的名称。
desc
:用于描述源数据文件的各项属性,包括文件名称、格式、列分隔符、行分隔符、目标分区,以及与StarRocks表之间的列对应关系等。填写格式为如下所示。-T <file_path> -H "format: CSV | JSON" -H "column_separator: <column_separator>" -H "row_delimiter: <row_delimiter>" -H "columns: <column1_name>[, <column2_name>, ... ]" -H "partitions: <partition1_name>[, <partition2_name>, ...]" -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]" -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]" -H "strip_outer_array: true | false" -H "json_root: <json_path>" -H "ignore_json_size: true | false" -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"
data_desc
中的参数可以分为三类:公共参数、CSV适用的参数、以及JSON适用的参数。公共参数
参数
是否必选
说明
<file_path>
是
指定源数据文件的保存路径。文件名里可选包含或者不包含扩展名。
format
否
指定待导入数据的格式。取值包括
CSV
和JSON
。默认值:CSV
。partitions
否
指定需将数据导入的具体分区。如果未指定该参数,则默认将数据导入StarRocks表所在的所有分区。
temporary_partitions
否
指定要导入数据的临时分区。
columns
否
指定源数据文件与StarRocks表之间的列对应关系。
如果源数据文件中的列与StarRocks表中的列按顺序一一对应,则不需要指定该参数。
如果源数据文件与表Schema不对应,则需要该参数来配置数据转换规则。列有两种形式,一种是直接对应于导入文件中的字段,可以直接使用字段名表示,一种需要通过计算得出。
示例1:表中有3列
c1, c2, c3
,源文件中的3列依次对应的是c3,c2,c1
,则需要指定-H "columns: c3, c2, c1"
。示例2:表中有3列
c1, c2, c3
,源文件中前3列与表中的列一一对应,但是还有多余1列,则需要指定-H "columns: c1, c2, c3, temp"
,最后1列随意指定名称用于占位即可。示例3:表中有3列
year, month, day
,源文件中只有一个时间列,为2018-06-01 01:02:03格式,则可以指定-H "columns: col, year = year(col), month=month(col), day=day(col)"
完成导入。
CSV适用参数
参数
是否必选
说明
column_separator
否
用于指定源数据文件中的列分隔符,默认为
\t
。如果是不可见字符,则需要加
\x
作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符\x01
,需要指定为-H "column_separator:\x01"
。row_delimiter
否
用于指定源数据文件中的行分隔符,默认为
\n
。重要curl命令无法传递\n,换行符手动指定为\n时,shell会先传递反斜线(\),然后传递n而不是直接传递换行符\n。
Bash支持另一种转义字符串语法,传递
\n
和\t
时,使用美元符号和全角单引号($')启动字符串并以半角单引号(')结束字符串。例如,-H $'row_delimiter:\n'
。skip_header
否
用于指定跳过CSV文件开头的若干行数据。取值类型为整数(INTEGER),默认值为0。
在某些CSV文件中,最开头的几行数据用于定义列名、列类型等元数据信息。通过设置该参数,可以使StarRocks在导入数据时忽略CSV文件的前几行。例如,如果将该参数设置为1,则StarRocks在导入数据时将忽略CSV文件的第一行。
这里的行所使用的分隔符须与您在导入命令中所设定的行分隔符一致。
where
否
用于抽取部分数据。用户如需将不需要的数据过滤掉,那么可以通过设定这个选项来达到。
例如,只导入k1列等于20180601的数据,则可以在导入时指定
-H "where: k1 = 20180601"
。max_filter_ratio
否
最大容忍可过滤(例如,因为数据不规范等原因而过滤)的数据比例。默认零容忍。
说明此处数据不规范的数据不包括通过WHERE条件过滤的数据。
partitions
否
用于指定该导入所涉及的Partition。
如果您能够确定数据对应的Partition,则推荐指定该项。不满足指定分区的数据将被过滤掉。例如,指定导入到p1和p2分区,可以指定
-H "partitions: p1, p2"
。timeout
否
指定导入的超时时间。默认是600秒。
设置范围为1~259200,单位为秒。
strict_mode
否
指定此次导入是否开启严格模式。取值如下:
false(默认值):不开启。
true:开启。开启后,会对导入过程中的列类型转换进行严格过滤。
timezone
否
指定本次导入所使用的时区。默认为东八区。
该参数会影响所有导入涉及和时区有关的函数结果。
exec_mem_limit
否
导入内存限制。默认值为2 GB。
JSON适用参数
参数
是否必选
说明
jsonpaths
否
用于指定待导入的字段的名称。仅在使用匹配模式导入 JSON 数据时需要指定该参数。参数取值为 JSON 格式。
strip_outer_array
否
用于指定是否裁剪最外层的数组结构。取值如下:
false(默认值):表示会保留JSON数据的原始结构,不剥离外层数组,效果是将整个JSON数组作为单一值导入。
例如,示例数据
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}]
,在设置strip_outer_array
为false后,会解析为一个数组数据导入表中。true:当导入数据格式为JSON数组时,需要设置
strip_outer_array
为 true。例如,示例数据
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}]
,在设置strip_outer_array
为true后,会解析为两条数据导入表中。
json_root
否
用于指定待导入 JSON 数据的根元素。仅在使用匹配模式导入JSON数据时需要指定该参数。参数取值为合法的JsonPath字符串。默认值为空,表示会导入整个 JSON 数据文件的数据。
ignore_json_size
否
用于指定是否检查 HTTP 请求中 JSON Body 的大小。
说明HTTP请求中JSON Body的大小默认不能超过100 MB。如果JSON Body的大小超过100 MB,会提示
"The size of this batch exceed the max size [104857600] of json type data data [8617627793
]. Set ignore_json_size to skip check, although it may lead huge memory consuming." 错误。为避免该报错,可以在HTTP请求头中添加"ignore_json_size:true"
设置,忽略对JSON Body大小的检查。compression, Content-Encoding
否
指定在STREAM LOAD数据传输过程中使用哪种压缩算法,支持GZIP、BZIP2、LZ4_FRAME、ZSTD算法。
例如,
curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4
。
opt_properties
:用于指定一些导入相关的可选参数。指定的参数设置作用于整个导入任务。填写格式为如下所示。
-H "label: <label_name>" -H "where: <condition1>[, <condition2>, ...]" -H "max_filter_ratio: <num>" -H "timeout: <num>" -H "strict_mode: true | false" -H "timezone: <string>" -H "load_mem_limit: <num>" -H "partial_update: true | false" -H "partial_update_mode: row | column" -H "merge_condition: <column_name>"
参数说明如下表所示。
参数
是否必选
说明
label
否
导入任务的标签,相同标签的数据无法多次导入。
您可以通过指定Label的方式来避免一份数据重复导入的问题。当前StarRocks系统会保留最近30分钟内成功完成的任务的Label。
where
否
用于指定过滤条件。如果指定了该参数,StarRocks将根据所设定的过滤条件对转换后的数据进行筛选。只有满足
where
子句中所定义的过滤条件的数据才会被导入。例如,只导入k1列等于20180601的数据,则可以在导入时指定
-H "where: k1 = 20180601"
。max_filter_ratio
否
最大容忍可过滤(例如,因为数据不规范等原因而过滤)的数据比例。默认零容忍。
说明此处数据不规范的数据不包括通过
where
条件过滤的数据。log_rejected_record_num
否
指定最多允许记录因数据质量不合格而被过滤的数据行数。该参数自3.1版本起支持。取值范围:
0
、-1
、大于 0 的正整数。默认值:0
。取值为
0
表示不记录过滤掉的数据行。取值为
-1
表示记录所有过滤掉的数据行。取值为大于0的正整数(比如
n
)表示每个 BE(或 CN)节点上最多可以记录n
条过滤掉的数据行。
timeout
否
指定导入的超时时间。默认是600秒。
设置范围为1~259200,单位为秒。
strict_mode
否
指定此次导入是否开启严格模式。
false(默认值):不开启。
true:开启。
timezone
否
指定本次导入所使用的时区。默认为东八区。
该参数会影响所有导入涉及和时区有关的函数结果。
load_mem_limit
否
导入任务的内存限制。默认值为2 GB。
partial_update
否
是否使用部分列更新。取值包括
TRUE
和FALSE
。默认值:FALSE
。partial_update_mode
否
指定部分更新的模式,取值包括
row
和column
。row
(默认值),指定使用行模式执行部分更新,比较适用于较多列且小批量的实时更新场景。column
,指定使用列模式执行部分更新,比较适用于少数列并且大量行的批处理更新场景。在该场景,开启列模式,更新速度更快。例如,在一个包含100列的表中,每次更新10列(占比10%)并更新所有行,则开启列模式,更新性能将提高10倍。
merge_condition
否
用于指定作为更新生效条件的列名。这样只有当导入的数据中该列的值大于等于当前值的时候,更新才会生效。
说明指定的列必须为非主键列,且仅主键表支持条件更新。
示例
将CSV格式的文件data.csv
导入至StarRocks集群的load_test库的example_table表中,完整示例请参见导入数据的完整示例。
curl --location-trusted -u "root:" \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator: ," \
-T data.csv -XPUT \
http://172.17.**.**:18030/api/load_test/example_table/_stream_load
返回值
导入完成后,将以JSON格式返回导入任务的结果信息。返回结果示例如下。
{
"TxnId": 9,
"Label": "label2",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 4,
"NumberLoadedRows": 4,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 45,
"LoadTimeMs": 235,
"BeginTxnTimeMs": 101,
"StreamLoadPlanTimeMs": 102,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 11,
"CommitAndPublishTimeMs": 19
}
返回结果中的参数说明如下表所述。
参数 | 描述 |
| 导入的事务ID。 |
| 导入的标签。 |
| 导入状态,取值如下:
|
| 已存在Label对应的导入任务的状态。该字段只有当
|
| 导入任务的状态详情。导入失败时会返回具体的失败原因。 |
| 从数据流中读取到的总行数。 |
| 导入任务的数据行数,仅在导入状态为Success时有效。 |
| 导入任务过滤掉的行数,即数据质量不合格的行。 |
| 通过Where条件被过滤掉的行数。 |
| 导入任务的源文件数据量大小。 |
| 导入任务所用的时间,单位为ms。 |
| 导入任务开启事务的时长。 |
| 导入任务生成执行计划的时长。 |
| 导入任务读取数据的时长。 |
| 导入任务写入数据的时长。 |
| 如果任务导入失败,会返回该参数。 通过 您可以通过 例如,导出错误数据行的信息。
导出的错误数据行信息会保存到一个名为_loa 您可以根据错误信息调整导入任务,然后重新提交导入任务。 |
取消导入任务
Stream Load无法手动取消,Stream Load在超时或者导入错误后会被系统自动取消。您可根据返回结果中的ErrorURL
下载报错信息,进行错误排查。
导入数据的完整示例
本文示例通过curl
命令导入任务。
创建待导入数据的表。
使用SSH方式登录StarRocks集群的Master节点,详情请参见登录集群。
执行以下命令,通过MySQL客户端连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
执行以下命令,创建对应的库表。
CREATE DATABASE IF NOT EXISTS load_test; USE load_test; CREATE TABLE IF NOT EXISTS example_table ( id INT, name VARCHAR(50), age INT ) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ( "replication_num" = "1" -- 副本数设为 1 );
执行完后按
Ctrl+D
退出MySQL客户端。
准备测试数据。
准备CSV数据
例如,创建待导入文件
data.csv
,内容如下所示。id,name,age 1,Alice,25 2,Bob,30 3,Charlie,35
准备JSON数据
例如,创建待导入文件
json.data
,内容如下所示。{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}
执行以下命令,创建导入任务。
导入CSV数据
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label1" \ -H "column_separator: ," \ -T data.csv -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_load
导入JSON数据
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label2" \ -H "format:json" \ -T json.data -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_load
代码集成示例
Java开发Stream Load,详情请参见stream_load。
Spark集成Stream Load,详情请参见01_sparkStreaming2StarRocks。