StarRocks支持从本地直接导入数据,支持CSV文件格式,数据量在10 GB以下。本文为您介绍Stream Load导入的基本原理、使用示例和最佳实践。
背景信息
Stream Load是一种同步的导入方式,通过发送HTTP请求将本地文件或数据流导入到StarRocks中。Stream Load同步执行导入并返回导入结果。您可以直接通过请求的返回值判断导入是否成功。
基本概念
Coordinator:协调节点。负责接收数据并分发数据到其他数据节点,导入完成后返回结果。
基本原理
Stream Load通过HTTP协议提交导入命令。如果提交到FE节点,则FE节点会通过HTTP Redirect指令将请求转发给某一个BE节点,您也可以直接提交导入命令给某一指定BE节点。该BE节点作为Coordinator节点,将数据按表Schema划分并分发数据到相关的BE节点。导入的最终结果由Coordinator节点返回给用户。
Stream Load的主要流程如下图所示。
导入示例
创建导入任务
Stream Load通过HTTP协议提交和传输数据。本示例通过curl
命令展示如何提交导入任务。您也可以通过其他HTTP Client进行操作。
语法
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \
http://fe_host:http_port/api/{db}/{table}/_stream_load
当前支持HTTP chunked与非chunked两种上传方式,对于非chunked方式,必须要有Content-Length来标示上传的内容长度,保证数据的完整性。
建议设置Expect Header字段内容为100-continue,可以在某些出错场景下避免不必要的数据传输。
Header中支持的属性见下表的导入任务参数描述,格式为-H "key1:value1"。如果同时有多个任务参数,需要用多个-H来指示,类似于-H "key1:value1" -H "key2:value2"……。Stream Load中所有与导入任务相关的参数均设置在Header中。相关参数描述如下表所示。
参数 | 描述 | |
签名参数 | user:passwd | Stream Load创建导入任务使用的是HTTP协议,已通过Basic access authentication进行签名。StarRocks系统会根据签名来验证用户身份和导入权限。 |
导入任务参数 | label | 导入任务的标签,相同标签的数据无法多次导入。 您可以通过指定Label的方式来避免一份数据重复导入的问题。当前StarRocks系统会保留最近30分钟内成功完成的任务的Label。 |
column_separator | 用于指定导入文件中的列分隔符,默认为\t。 如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符\x01,需要指定为 | |
row_delimiter | 指定导入文件中的行分隔符,默认为\n。 重要 curl命令无法传递\n,换行符手动指定为\n时,Shell会先传递反斜线(\),然后传递n而不是直接传递换行符\n。 Bash支持另一种转义字符串语法,传递\n和\t时,使用美元符号和全角单引号($')启动字符串并以半角单引号(')结束字符串。例如, | |
columns | 用于指定导入文件中的列和Table中列的对应关系。 如果源文件中的列正好对应表中的内容,则无需指定该参数。如果源文件与表Schema不对应,则需要该参数来配置数据转换规则。列有两种形式,一种是直接对应于导入文件中的字段,可以直接使用字段名表示,一种需要通过计算得出。
| |
where | 用于抽取部分数据。如需过滤掉不需要的数据,则可以通过设置该参数来实现。 例如,只导入k1列等于20180601的数据,则可以在导入时指定 | |
max_filter_ratio | 最大容忍可过滤(例如,因为数据不规范等原因而过滤)的数据比例。默认为0,取值范围是0~1。 说明 此处数据不规范的数据不包括通过WHERE条件过滤的数据。 | |
partitions | 用于指定该导入所涉及的Partition。 如果您能够确定数据对应的Partition,则推荐指定该项。不满足指定分区的数据将被过滤掉。例如,指定导入到p1和p2分区,可以指定 | |
timeout | 指定导入的超时时间。默认是600,单位为秒。 设置范围为1~259200。 | |
strict_mode | 指定此次导入是否开启严格模式,默认为开启。 关闭方式为 | |
timezone | 指定本次导入所使用的时区。默认为东八区。 该参数会影响所有导入涉及和时区有关的函数结果。 | |
exec_mem_limit | 导入内存限制。默认值为2 GB。 |
示例
curl --location-trusted -u root -T date -H "label:123" \
http://abc.com:8030/api/test/date/_stream_load
导入任务完成后,Stream Load会以JSON格式返回导入任务的相关内容,返回结果示例如下。
{
"TxnId": 11672,
"Label": "f6b62abf-4e16-4564-9009-b77823f3c024",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 199563535,
"NumberLoadedRows": 199563535,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 50706674331,
"LoadTimeMs": 801327,
"BeginTxnTimeMs": 103,
"StreamLoadPlanTimeMs": 0,
"ReadDataTimeMs": 760189,
"WriteDataTimeMs": 801023,
"CommitAndPublishTimeMs": 199"
}
参数 | 描述 |
TxnId | 导入的事务ID。用户可不感知。 |
Label | 导入的Label。由用户指定或系统自动生成。 |
Status | 导入完成状态。
|
ExistingJobStatus | 已存在Label对应的导入作业的状态。该字段只有当Status为Label Already Exists时才会显示。您可以通过该状态,知晓已存在Label对应的导入作业的状态。
|
Message | 导入状态的详细说明。导入失败时会返回具体的失败原因。 |
NumberTotalRows | 从数据流中读取到的总行数。 |
NumberLoadedRows | 导入任务的数据行数,仅在导入状态为Success时有效。 |
NumberFilteredRows | 导入任务过滤掉的行数,即数据质量不合格的行。 |
NumberUnselectedRows | 通过Where条件被过滤掉的行数。 |
LoadBytes | 导入任务的源文件数据量大小。 |
LoadTimeMs | 导入任务所用的时间,单位为ms。 |
ErrorURL | 被过滤数据的具体内容,仅保留前1000条数据。如果导入任务失败,可以直接用以下方式获取被过滤的数据并进行分析,以调整导入任务。
|
取消导入任务
Stream Load可以通过停止进程来取消任务,Stream Load在超时或者导入错误后会被系统自动取消。
ps -ef | grep stream_load
最佳实践
应用场景
Stream Load的最佳使用场景是原始文件在内存中或者存储在本地磁盘中。由于Stream Load是一种同步的导入方式,所以当您希望用同步方式获取导入结果时,也可以使用该导入方式。
数据量
由于Stream Load是由BE发起的导入并分发数据,建议的导入数据量在1 GB到10 GB之间。系统默认的最大Stream Load导入数据量为10 GB,所以导入超过10 GB的文件需要修改BE的配置项streaming_load_max_mb。例如,待导入文件大小约为15 GB(15360 MB),则可以修改BE的配置项streaming_load_max_mb大于15 GB即可。
curl --location-trusted -u 'admin:****' -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360
Stream Load的默认超时为600秒,您可以登录EMR Serverless控制台,通过设置FE的参数来修改该参数值。
完整示例
数据情况:数据在客户端本地磁盘路径/mnt/disk1/customer.tbl中,希望导入到数据库stream_load的表customer中。
标准数据下载:customer.tbl
集群情况:Stream Load的并发数不受集群大小影响。
示例如下:
当导入文件大小超过默认的最大导入大小时,需要修改BE的配置文件BE.conf。例如,修改参数streaming_load_max_mb,调整最大导入为15360 MB。
curl --location-trusted -u 'admin:*****' -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360
在EMR Serverless StarRocks实例配置页面,修改参数stream_load_default_timeout_second,本示例调整超时时间为3600。
创建目标表customer。
CREATE TABLE `customer` ( `c_custkey` bigint(20) NULL COMMENT "", `c_name` varchar(65533) NULL COMMENT "", `c_address` varchar(65533) NULL COMMENT "", `c_nationkey` bigint(20) NULL COMMENT "", `c_phone` varchar(65533) NULL COMMENT "", `c_acctbal` double NULL COMMENT "", `c_mktsegment` varchar(65533) NULL COMMENT "", `c_comment` varchar(65533) NULL COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`c_custkey`) COMMENT "OLAP" DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 24 PROPERTIES ( "replication_num" = "1", "in_memory" = "false", "storage_format" = "DEFAULT", "enable_persistent_index" = "false", "compression" = "LZ4" );
创建导入任务。由于数据集较大,可以在后台执行。
curl --location-trusted -u 'admin:*****' -T /mnt/disk1/customer.tbl -H "label:labelname" -H "column_separator:|" http://fe-c-****-internal.starrocks.aliyuncs.com:8030/api/load_test/customer/_stream_load
返回信息如下。
{ "TxnId": 575, "Label": "labelname", "Status": "Success", "Message": "OK", "NumberTotalRows": 150000, "NumberLoadedRows": 150000, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 24196144, "LoadTimeMs": 1081, "BeginTxnTimeMs": 104, "StreamLoadPlanTimeMs": 106, "ReadDataTimeMs": 85, "WriteDataTimeMs": 850, "CommitAndPublishTimeMs": 20 }
说明如果报错
"ErrorURL": "http://***:8040/api/_load_error_log?file=error_log_***"
,使用curl
命令查看详细信息即可。
代码集成示例
Java开发Stream Load,详情请参见stream_load。
Spark集成Stream Load,详情请参见01_sparkStreaming2StarRocks。