Stream Load是一种同步的导入方式,您可以通过HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream Load同步执行导入并返回导入结果。您可以直接通过请求的返回体判断本次导入是否成功。本文为您介绍Stream Load导入的基本原理、基本操作、系统配置以及最佳实践。
适用场景
Stream Load主要适用于导入本地文件或通过程序导入数据流中的数据。
基本原理
下面为您展示了Stream Load的主要流程,省略了部分导入细节。
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
Stream Load中,Doris会选定一个节点作为Coordinator节点,该节点负责接收数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。您也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给您。
支持的数据格式
Stream Load支持CSV(文本)和JSON两个数据格式。
基本操作
创建导入任务
Stream Load通过HTTP协议提交和传输数据。本示例通过curl
命令展示如何提交导入任务。您也可以通过其他HTTP Client进行操作。
curl
命令curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load # Header中支持属性见下表。 # 格式为: -H "key1:value1"
创建导入任务的详细语法可以通过
HELP STREAM LOAD
命令查看。Stream Load中所有与导入任务相关的参数均设置在Header中。相关参数描述如下表所示。参数
说明
签名参数
user:passwd
Stream Load创建导入任务使用的是HTTP协议,已通过Basic access authentication进行签名。Doris会根据签名来验证用户身份和导入权限。
导入任务参数
label
导入任务的标识。
每个导入任务,都有一个在单database内部唯一的Label。Label是您在导入命令中自定义的名称。通过该Label,您可以查看对应导入任务的执行情况。Label的另一个作用是防止您重复导入相同的数据。强烈推荐您同一批次数据使用相同Label,这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。当Label对应的导入作业状态为CANCELLED时,该Label可以再次被使用。
column_separator
用于指定导入文件中的列分隔符,默认为\t。
如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符\x01,需要指定为
-H "column_separator:\x01"
。可以使用多个字符的组合作为列分隔符。line_delimiter
用于指定导入文件中的换行符,默认为\n。可以使用多个字符的组合作为换行符。
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。
当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以通过设置该参数大于0来保证导入成功。计算公式为
(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio
,其中dpp.abnorm.ALL
表示数据质量不合格的行数,例如类型不匹配、列数不匹配、长度不匹配等;dpp.norm.ALL
表示导入过程中正确数据的条数,可以通过SHOW LOAD
命令查询导入任务的正确数据量。原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL
where
导入任务指定的过滤条件。
Stream Load支持对原始数据指定where语句进行过滤。被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。
Partitions
待导入表的Partition信息,如果待导入数据不属于指定的Partition,则不会被导入。未被导入的数据将计入 dpp.abnorm.ALL。
columns
待导入数据的函数变换配置,目前Stream Load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
exec_mem_limit
导入内存限制。默认为2 GB,单位为字节。
strict_mode
指定此次导入是否开启strict mode模式,默认关闭。
Stream Load导入可以开启strict mode模式,开启方式为在HEADER中声明
strict_mode=true
。strict mode模式的意思是对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:对于列类型转换来说,如果strict mode为true,则错误数据将被filter。这里的错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
对于导入的某列由函数变换生成时,strict mode对其不产生影响。
对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode对其也不产生影响。例如,如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,strict mode对其不产生影响。
merge_type
数据的合并类型,共支持APPEND、DELETE、MERGE三种类型。
APPEND(默认值):表示这批数据全部需要追加到现有数据中。
DELETE:表示删除与这批数据key相同的所有行。
MERGE:需要与DELETE条件联合使用,表示满足DELETE条件的数据按照DELETE语义处理,其余的按照APPEND语义处理。
two_phase_commit
Stream Load导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息,此时数据不可见,事务状态为PRECOMMITTED,您手动触发commit操作之后,数据才可见。默认的两阶段批量事务提交为关闭。
开启方式是在be.conf中配置
disable_stream_load_2pc=false
,并且在HEADER中声明two_phase_commit=true
。示例:
发起Stream Load预提交操作。
说明列顺序变换例子:原始数据有三列src_c1、src_c2、rc_c3,目前Doris表也有三列dst_c1、dst_c2、dst_c3。
如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法为
columns: dst_c1, dst_c2, dst_c3
。如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法为
columns: dst_c2, dst_c3, dst_c1
。表达式变换例子:原始文件有两列,目标表也有两列(c1,c2),但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下为
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
,其中tmp_*
是一个占位符,代表的是原始文件中的两个原始列。
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load { "TxnId": 18036, "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****", "TwoPhaseCommit": "true", "Status": "Success", "Message": "OK", "NumberTotalRows": 100, "NumberLoadedRows": 100, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 1031, "LoadTimeMs": 77, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 1, "ReadDataTimeMs": 0, "WriteDataTimeMs": 58, "CommitAndPublishTimeMs": 0 }
对事务触发commit操作。
对事务触发abort操作。
示例
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
返回结果
由于Stream Load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。示例如下。
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****" }
Stream load导入结果参数如下表。
参数
说明
TxnId
导入的事务ID。用户可不感知。
Label
导入的Label。由用户指定或系统自动生成。
Status
导入完成状态。
Success:表示导入成功。
Publish Timeout:表示导入已经完成,只是数据可能会延迟可见,无需重试。
Label Already Exists:Label重复,需更换Label。
Fail:导入失败。
ExistingJobStatus
已存在Label对应的导入作业的状态。该字段只有当Status为Label Already Exists时才会显示。您可以通过该状态,知晓已存在Label对应的导入作业的状态。
RUNNING:表示作业在执行中。
FINISHED:表示作业成功。
Message
导入错误信息。
NumberTotalRows
导入总处理的行数。
NumberLoadedRows
成功导入的行数。
NumberFilteredRows
数据质量不合格的行数。
NumberUnselectedRows
被where条件过滤的行数。
LoadBytes
导入的字节数。
LoadTimeMs
导入完成时间。单位毫秒。
BeginTxnTimeMs
向FE请求开始一个事务所花费的时间,单位毫秒。
StreamLoadPutTimeMs
向FE请求获取导入数据执行计划所花费的时间,单位毫秒。
ReadDataTimeMs
读取数据所花费的时间,单位毫秒。
WriteDataTimeMs
执行写入数据操作所花费的时间,单位毫秒。
CommitAndPublishTimeMs
向FE请求提交并且发布事务所花费的时间,单位毫秒。
ErrorURL
如果有数据质量问题,通过访问该URL查看具体错误行。
重要由于Stream Load是同步的导入方式,所以并不会在Doris中记录导入信息,您无法异步的通过查看导入命令看到Stream Load。使用时需监听创建导入请求的返回值获取导入结果。
取消导入
您无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。
查看Stream Load
您可以通过show stream load
来查看已经完成的Stream Load任务。
默认BE是不记录Stream Load的记录,如果您要查看需要在BE上启用记录,配置参数enable_stream_load_record=true
,具体配置详情请参见BE参数配置。
相关系统配置
FE配置
stream_load_default_timeout_second:导入任务的超时时间(以秒为单位),导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。默认的timeout时间为600秒。如果导入的源文件无法在规定时间内完成导入,您可以在Stream Load 请求中设置单独的超时时间,或者调整FE的stream_load_default_timeout_second参数来设置全局的默认超时时间。
BE配置
streaming_load_max_mbStream:Stream Load的最大导入大小,默认为10 GB,单位是MB。如果您的原始文件超过该值,则需要调整BE的streaming_load_max_mb参数。
最佳实践
应用场景
使用Stream Load最合适的场景就是原始文件在内存中或者在磁盘中。其次,由于Stream Load是一种同步的导入方式,所以如果您希望用同步方式获取导入结果,也可以使用这种导入。
数据量
由于Stream Load的原理是由BE发起的导入并分发数据,建议的导入数据量在1 GB到10 GB之间。由于默认的最大Stream Load导入数据量为 10 GB,所以导入超过10 GB的文件就需要修改BE的配置streaming_load_max_mb。
例如,如果待导入文件大小为15 GB,则需修改BE配置streaming_load_max_mb为16000即可。
Stream Load的默认超时为300秒,按照Doris目前最大的导入限速来看,约超过3 GB的文件就需要修改导入任务默认超时时间。
导入任务超时时间 = 导入数据量 / 10M/s (具体的平均导入速度需要您根据自己的集群情况计算)
例如,如果导入一个10 GB的文件,则timeout = 1000s ,为10G / 10M/s
。
完整示例
数据情况:数据在客户端本地磁盘路径/home/store-sales中,导入的数据量约为15 GB,希望导入到数据库bj-sales的表store-sales中。
集群情况:Stream Load的并发数不受集群大小影响。
示例如下:
因为导入文件大小超过默认的最大导入大小10 GB,所以需要修改BE的配置文件BE.conf。
streaming_load_max_mb = 16000
计算大概的导入时间是否超过默认timeout值,导入时间为
15000 / 10 = 1500s
,如果超过了默认的timeout时间,则需要修改FE的配置FE.conf,修改参数stream_load_default_timeout_second,将导入时间调整为1500。创建导入任务。
curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load