文档

Stream Load

更新时间:

Stream Load是一种同步的导入方式,您可以通过HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream Load同步执行导入并返回导入结果。您可以直接通过请求的返回体判断本次导入是否成功。本文为您介绍Stream Load导入的基本原理、基本操作、系统配置以及最佳实践。

适用场景

Stream Load主要适用于导入本地文件或通过程序导入数据流中的数据。

基本原理

下面为您展示了Stream Load的主要流程,省略了部分导入细节。

^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

Stream Load中,Doris会选定一个节点作为Coordinator节点,该节点负责接收数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。您也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给您。

支持的数据格式

Stream Load支持CSV(文本)和JSON两个数据格式。

基本操作

创建导入任务

Stream Load通过HTTP协议提交和传输数据。本示例通过curl命令展示如何提交导入任务。您也可以通过其他HTTP Client进行操作。

  • curl命令

    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
    
    # Header中支持属性见下表。
    # 格式为: -H "key1:value1"

    创建导入任务的详细语法可以通过HELP STREAM LOAD命令查看。Stream Load中所有与导入任务相关的参数均设置在Header中。相关参数描述如下表所示。

    参数

    说明

    签名参数

    user:passwd

    Stream Load创建导入任务使用的是HTTP协议,已通过Basic access authentication进行签名。Doris会根据签名来验证用户身份和导入权限。

    导入任务参数

    label

    导入任务的标识。

    每个导入任务,都有一个在单database内部唯一的Label。Label是您在导入命令中自定义的名称。通过该Label,您可以查看对应导入任务的执行情况。Label的另一个作用是防止您重复导入相同的数据。强烈推荐您同一批次数据使用相同Label,这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。当Label对应的导入作业状态为CANCELLED时,该Label可以再次被使用。

    column_separator

    用于指定导入文件中的列分隔符,默认为\t。

    如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。可以使用多个字符的组合作为列分隔符。

    line_delimiter

    用于指定导入文件中的换行符,默认为\n。可以使用多个字符的组合作为换行符。

    max_filter_ratio

    导入任务的最大容忍率,默认为0容忍,取值范围是0~1。

    当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以通过设置该参数大于0来保证导入成功。计算公式为(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio,其中dpp.abnorm.ALL表示数据质量不合格的行数,例如类型不匹配、列数不匹配、长度不匹配等;dpp.norm.ALL表示导入过程中正确数据的条数,可以通过SHOW LOAD命令查询导入任务的正确数据量。

    原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL

    where

    导入任务指定的过滤条件。

    Stream Load支持对原始数据指定where语句进行过滤。被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。

    Partitions

    待导入表的Partition信息,如果待导入数据不属于指定的Partition,则不会被导入。未被导入的数据将计入 dpp.abnorm.ALL。

    columns

    待导入数据的函数变换配置,目前Stream Load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

    exec_mem_limit

    导入内存限制。默认为2 GB,单位为字节。

    strict_mode

    指定此次导入是否开启strict mode模式,默认关闭。

    Stream Load导入可以开启strict mode模式,开启方式为在HEADER中声明strict_mode=true。strict mode模式的意思是对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

    • 对于列类型转换来说,如果strict mode为true,则错误数据将被filter。这里的错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

    • 对于导入的某列由函数变换生成时,strict mode对其不产生影响。

    • 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode对其也不产生影响。例如,如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,strict mode对其不产生影响。

    merge_type

    数据的合并类型,共支持APPEND、DELETE、MERGE三种类型。

    • APPEND(默认值):表示这批数据全部需要追加到现有数据中。

    • DELETE:表示删除与这批数据key相同的所有行。

    • MERGE:需要与DELETE条件联合使用,表示满足DELETE条件的数据按照DELETE语义处理,其余的按照APPEND语义处理。

    two_phase_commit

    Stream Load导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息,此时数据不可见,事务状态为PRECOMMITTED,您手动触发commit操作之后,数据才可见。默认的两阶段批量事务提交为关闭。

    开启方式是在be.conf中配置disable_stream_load_2pc=false,并且在HEADER中声明two_phase_commit=true

    示例:

    1. 发起Stream Load预提交操作。

      说明

      列顺序变换例子:原始数据有三列src_c1、src_c2、rc_c3,目前Doris表也有三列dst_c1、dst_c2、dst_c3。

      • 如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法为columns: dst_c1, dst_c2, dst_c3

      • 如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法为columns: dst_c2, dst_c3, dst_c1

      • 表达式变换例子:原始文件有两列,目标表也有两列(c1,c2),但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下为columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2),其中tmp_*是一个占位符,代表的是原始文件中的两个原始列。

      curl  --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
      {
          "TxnId": 18036,
          "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****",
          "TwoPhaseCommit": "true",
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 100,
          "NumberLoadedRows": 100,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 1031,
          "LoadTimeMs": 77,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 1,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 58,
          "CommitAndPublishTimeMs": 0
      }
    2. 对事务触发commit操作。

    3. 对事务触发abort操作。

  • 示例

    curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
  • 返回结果

    由于Stream Load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。示例如下。

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "BeginTxnTimeMs": 1,
        "StreamLoadPutTimeMs": 2,
        "ReadDataTimeMs": 325,
        "WriteDataTimeMs": 1933,
        "CommitAndPublishTimeMs": 106,
        "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****"
    }

    Stream load导入结果参数如下表。

    参数

    说明

    TxnId

    导入的事务ID。用户可不感知。

    Label

    导入的Label。由用户指定或系统自动生成。

    Status

    导入完成状态。

    • Success:表示导入成功。

    • Publish Timeout:表示导入已经完成,只是数据可能会延迟可见,无需重试。

    • Label Already Exists:Label重复,需更换Label。

    • Fail:导入失败。

    ExistingJobStatus

    已存在Label对应的导入作业的状态。该字段只有当Status为Label Already Exists时才会显示。您可以通过该状态,知晓已存在Label对应的导入作业的状态。

    • RUNNING:表示作业在执行中。

    • FINISHED:表示作业成功。

    Message

    导入错误信息。

    NumberTotalRows

    导入总处理的行数。

    NumberLoadedRows

    成功导入的行数。

    NumberFilteredRows

    数据质量不合格的行数。

    NumberUnselectedRows

    被where条件过滤的行数。

    LoadBytes

    导入的字节数。

    LoadTimeMs

    导入完成时间。单位毫秒。

    BeginTxnTimeMs

    向FE请求开始一个事务所花费的时间,单位毫秒。

    StreamLoadPutTimeMs

    向FE请求获取导入数据执行计划所花费的时间,单位毫秒。

    ReadDataTimeMs

    读取数据所花费的时间,单位毫秒。

    WriteDataTimeMs

    执行写入数据操作所花费的时间,单位毫秒。

    CommitAndPublishTimeMs

    向FE请求提交并且发布事务所花费的时间,单位毫秒。

    ErrorURL

    如果有数据质量问题,通过访问该URL查看具体错误行。

    重要

    由于Stream Load是同步的导入方式,所以并不会在Doris中记录导入信息,您无法异步的通过查看导入命令看到Stream Load。使用时需监听创建导入请求的返回值获取导入结果。

取消导入

您无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。

查看Stream Load

您可以通过show stream load来查看已经完成的Stream Load任务。

默认BE是不记录Stream Load的记录,如果您要查看需要在BE上启用记录,配置参数enable_stream_load_record=true ,具体配置详情请参见BE参数配置

相关系统配置

FE配置

stream_load_default_timeout_second:导入任务的超时时间(以秒为单位),导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。默认的timeout时间为600秒。如果导入的源文件无法在规定时间内完成导入,您可以在Stream Load 请求中设置单独的超时时间,或者调整FE的stream_load_default_timeout_second参数来设置全局的默认超时时间。

BE配置

streaming_load_max_mbStream:Stream Load的最大导入大小,默认为10 GB,单位是MB。如果您的原始文件超过该值,则需要调整BE的streaming_load_max_mb参数。

最佳实践

应用场景

使用Stream Load最合适的场景就是原始文件在内存中或者在磁盘中。其次,由于Stream Load是一种同步的导入方式,所以如果您希望用同步方式获取导入结果,也可以使用这种导入。

数据量

由于Stream Load的原理是由BE发起的导入并分发数据,建议的导入数据量在1 GB到10 GB之间。由于默认的最大Stream Load导入数据量为 10 GB,所以导入超过10 GB的文件就需要修改BE的配置streaming_load_max_mb

例如,如果待导入文件大小为15 GB,则需修改BE配置streaming_load_max_mb为16000即可。

Stream Load的默认超时为300秒,按照Doris目前最大的导入限速来看,约超过3 GB的文件就需要修改导入任务默认超时时间。

导入任务超时时间 = 导入数据量 / 10M/s (具体的平均导入速度需要您根据自己的集群情况计算)

例如,如果导入一个10 GB的文件,则timeout = 1000s ,为10G / 10M/s

完整示例

数据情况:数据在客户端本地磁盘路径/home/store-sales中,导入的数据量约为15 GB,希望导入到数据库bj-sales的表store-sales中。

集群情况:Stream Load的并发数不受集群大小影响。

示例如下:

  1. 因为导入文件大小超过默认的最大导入大小10 GB,所以需要修改BE的配置文件BE.conf

    streaming_load_max_mb = 16000
  2. 计算大概的导入时间是否超过默认timeout值,导入时间为15000 / 10 = 1500s,如果超过了默认的timeout时间,则需要修改FE的配置FE.conf,修改参数stream_load_default_timeout_second,将导入时间调整为1500。

  3. 创建导入任务。

    curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load