Stream Load

当您需要将本地文件或数据流导入到云数据库 SelectDB 版实例时,可以使用Stream Load进行数据导入。本文介绍如何通过Stream Load导入数据至云数据库 SelectDB 版

背景信息

Stream Load属于同步接口的导入方式,您可以通过发送HTTP请求将本地文件或数据流导入到云数据库 SelectDB 版Stream Load执行后会立即返回导入结果,您可以通过请求的返回结果判断此次导入是否成功。其支持的数据格式有CSV(文本)、JSONPARQUETORC

重要

由于Stream load具有高吞吐、低延迟以及灵活可靠的特性,强烈建议您将Stream Load作为主要的数据导入方式。

准备工作

  1. 确保发起Stream Load请求的终端与SelectDB网络互通:

    1. 云数据库 SelectDB 版实例申请公网地址。具体操作,请参见申请和释放公网地址

      如果您发起Stream Load请求的终端与云数据库 SelectDB 版实例位于同一VPC下,跳过此步骤。

    2. 将发起Stream Load请求终端的相关IP添加至云数据库 SelectDB 版的白名单。具体操作,请参见设置白名单

    3. 若发起Stream Load请求终端存在白名单机制,已将SelectDB实例所在网段IP添加至源集群的白名单中。

  2. (可选)修改BE(BackEnd)配置,开启Stream Load操作记录。

    默认情况下,BE不保留Stream Load操作记录。

    如果您需要跟踪Stream Load的操作情况,需在创建导入任务之前配置enable_stream_load_record=true,以启用Stream Load操作记录。如需开启此功能,请提交工单以获得技术支持。

  3. (可选)修改BE配置,调整Stream load的最大导入限制。

    默认情况下,Stream Load导入文件的最大限制为10240MB。

    如果您的原始文件超过此值,则需调整后端参数streaming_load_max_mb。修改参数,请参见参数配置

  4. (可选)修改FE配置,调整导入超时时间。

    默认情况下,Stream Load的导入任务超时时间为600秒。如果导入任务在设定的超时时间内未完成,系统将取消该任务,并将其状态更改为CANCELLED。

    如果导入的源文件无法在规定时间内完成,您可以在Stream Load请求中设置单独的超时时间,或者调整FE参数stream_load_default_timeout_second以设定全局的默认超时时间。如需调整,请提交工单以获得技术支持。

注意事项

单次Stream Load可以写入几百MB1GB的数据。在业务场景中,频繁的写入少量数据可能导致实例性能大幅下降,甚至数据库表死锁。强烈建议您降低写入频率,对数据进行攒批处理。

  • 业务端攒批:您需自行收集业务数据,然后向SelectDB发起Stream Load请求。

  • 服务端攒批SelectDB接收到Stream Load请求后,服务端将进行请求数据的批处理操作。如何操作,请参见Group Commit

创建导入任务

Stream Load通过HTTP协议提交和传输数据。以下为通过curl命令提交导入示例,该命令可在LinuxmacOS系统的终端或Windows系统的命令提示符下执行。此外,Stream Load还支持通过其他HTTP客户端进行操作。

语法

curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

参数说明

参数名称

是否必选

参数说明

--location-trusted

需要认证时,会将usernamepassword传递给被重定向到的服务器。

-u

指定SelectDB实例的用户名和密码。

  • username:用户名。

  • password:密码。

-H

指定本次Stream Load导入请求的请求头(Header)内容。格式如下:

-H "key1:value1"

常见参数如下:

  • label:导入任务的唯一标识。

  • column_separator:指定导入文件中的列分隔符,默认为\t

    您也可以使用多个字符的组合作为列分隔符。

    如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。

更多请求头参数,请参见请求头参数说明

-T

指定需要导入数据的文件路径。

file_path:目标文件路径。

-XPUT

HTTP请求的Method,采用PUT请求方法,指定SelectDB的数据导入地址,具体参数如下:

  • hostSelectDB实例的VPC地址或公网地址。

    • 非同一VPC使用公网地址:如果您执行命令的终端所在设备与目标SelectDB实例不在同一VPC下,您需使用公网地址。如何申请公网,请参见申请和释放公网地址

    • 同一VPC使用VPC地址:如果您执行命令的终端所在设备为阿里云产品,且与目标SelectDB实例处于同一VPC下,建议您使用VPC地址。

  • portSelectDB实例的HTTP端口号,默认为8080。

    您可以在SelectDB的实例详情页面查看实例的连接地址和端口号。

  • db_name:数据库名。

  • table_name:数据表名。

请求头参数说明

Stream Load使用HTTP协议,因此导入任务有关的参数主要设置在请求头(Header)中。常用的导入参数如下。

参数名称

参数说明

label

导入任务的唯一标识。

Label作用:

  • 导入命令中自定义的名称。

  • 可用于查看对应导入任务的执行情况。

  • 可用于防止重复导入相同的数据。

  • 对应的导入作业状态为CANCELLED时,可以再次被使用。

重要

推荐同一批次数据使用相同的Label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once

format

指定导入数据格式。

  • 支持的格式:CSVJSONPARQUETORCcsv_with_names(CSV文件行首过滤)和csv_with_names_and_typesCSV文件前两行过滤)。

  • 默认格式:CSV

line_delimiter

指定导入文件中的换行符。

默认换行符:\n

您也可以使用多个字符的组合作为换行符。例如,在Windows系统中,使用\r\n作为换行符。

column_separator

指定导入文件中的列分隔符。

默认分隔符:\t

您也可使用多个字符的组合作为列分隔符。例如,可以使用双竖线||作为列分隔符。

如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如:Hive文件的分隔符\x01,需要指定为-H"column_separator:\x01"

compress_type

指定文件的压缩格式。仅支持CSV文件的压缩。

支持的压缩格式:gzlzobz2lz4lzopdeflate

max_filter_ratio

指定导入任务的最大容忍率。

当导入的错误率超过该阈值时,导入将失败。如需忽略错误行,必须将该参数设置为大于0,以确保导入成功。

  • 默认值:0,即零容忍。

  • 取值范围:[0,1]。

strict_mode

指定是否开启严格过滤模式。

  • false(默认值):不开启。

  • true:开启。开启后,会对导入过程中的列类型转换进行严格过滤。

    • 错误的数据将被过滤。

    • 非空原始数据的列类型变换如果结果为NULL,也会被过滤。

cloud_cluster

指定导入使用的集群。

默认为该实例的默认集群。如果该实例没有设置默认集群,则会自动选择一个有权限的集群。

load_to_single_tablet

指定是否仅将数据导入到对应分区的一个tablet。该参数仅允许在对具有random分桶的Duplicate表进行数据导入时设置。

  • false(默认值):代表向random分桶的Duplicate模型表导入数据时,不会只向对应分区的一个分桶写入数据。

  • true:代表向random分桶的Duplicate模型表导入数据时,会只向对应分区的一个分桶写入数据,从而可以提高数据导入的并发度和吞吐量。

where

指定导入任务的过滤条件。

支持对原始数据指定where语句进行过滤,被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入到被where条件过滤的行数

num_rows_unselected中。

partitions

指定待导入数据的分区(Partition)信息。

如果待导入数据不属于指定的分区(Partition)则不会被导入。这些数据将计入dpp.abnorm.ALL。

dpp.abnorm.ALL 是SelectDB中的一个计数器指标。表示在数据预处理阶段被过滤掉的总行数。导入结果中的 NumberFilteredRows 就包含了 dpp.abnorm.ALL 统计的异常行数

columns

指定待导入数据的函数变换配置。

支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

merge_type

指定数据合并类型。

  • APPEND(默认值):表示本次导入是普通的追加写操作。

  • MERGE:需要配合delete参数使用,以标注Delete Flag列。

  • DELETE:表示本次导入的所有数据皆为删除数据。

重要

MERGEDELETE类型仅适用于Unique模型。

delete

仅在指定merge_type类型为MERGE时才具有意义,表示数据的删除条件。

function_column.sequence_col

仅适用于Unique模型,相同Key列下,保证Value列按照source_sequence列进行REPLACE,source_sequence可以是数据源中的列,也可以是表结构中的一列。

exec_mem_limit

指定导入内存限制。

  • 单位:字节。

  • 默认值:2147483648,即2 GiB

timeout

指定导入的超时时间。

  • 单位:秒。

  • 默认值为600

  • 范围:[1,259200]。

timezone

指定本次导入所使用的时区。该参数会影响所有导入涉及的和时区有关的函数结果。有关时区,您可通过IANA时区数据库查看

默认值:Asia/Shanghai,即东八区。

two_phase_commit

指定是否开启两阶段事务提交模式。

  • false(默认值):不开启。

  • true:开启。开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。

是否开启建议

开启后数据导入具有原子性。要么全部成功,要么全部失败,同时避免了导入过程中出现部分数据可见的情况。

以下三种场景下适合开启:

  • 金融交易数据:需要严格保证数据完整性和一致性

  • 计费系统数据:不允许出现部分数据导入的情况

  • 关键业务数据:对数据准确性要求极高的场景

以下三种场景适合关闭:

  • 日志分析:对一致性要求不高,追求导入速度

  • 大数据量批处理:资源有限,需要快速完成导入

  • 可重复导入的数据:如果导入失败可以重新导入的数据

jsonpaths

导入JSON数据格式有两种方式:

  • 简单模式:无需指定jsonpaths,要求JSON中的key列名与表中的列名是一一对应的,顺序可以不一样,如JSON数据{"k1":1, "k2":2, "k3":"hello"},其中k1、k2、k3分别对应表中的列名。

  • 匹配模式:JSON数据相对复杂时,通过参数jsonpaths匹配其中的key列到表对应的列,如jsonpaths:["$.status", "$.res.id", "$.res.count"]可抽取JSON数据中的嵌套字段写入对应表中,默认情况下jsonpaths抽取的字段会按顺序映射到表的相应字段。

json_root

json_root可用于指定JSON中的子对象,作为导入解析的根节点。

默认值:"",代表选择整个JSON作为导入解析的根节点。

strip_outer_array

Stream Load 中处理JSON格式数据的一个重要参数,它控制着如何解析包含外层数组的JSON数据。

  • false(默认值):表示会保留JSON数据的原始结构,不剥离外层数组,效果是将整个JSON数组作为单一值导入。

    例如示例数据[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在设置strip_outer_arrayfalse后,会解析为一个数组数据导入表中。

  • true:当导入数据格式为JSON数组时,需要设置strip_outer_array为 true。

    例如示例数据[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在设置strip_outer_arraytrue后,会解析为两条数据导入表中。

read_json_by_line

Stream Load 中处理 JSON 格式数据的一个重要参数,它控制着如何解析包含多行JSON数据的输入文件。

  • false(默认值):将整个输入文件视为单个 JSON 值或数组,内核会尝试解析整个文件内容作为一个 JSON 对象或数组。

    例如,如果文件中是以下内容:

    [
     {"id":1, "name":"Alice", "age":25},
     {"id":2, "name":"Bob", "age":30},
     {"id":3, "name":"Charlie", "age":35}
    ]

    则整个文件内容会被解析为一个JSON数组。

  • true:表示导入数据的每一行是一个JSON对象。

    例如,如果文件中是以下内容:

    {"id":1, "name":"Alice", "age":25}
    {"id":2, "name":"Bob", "age":30}
    {"id":3, "name":"Charlie", "age":35}

    文件中每行被解析为一个JSON对象。

示例

CSV格式的文件data.csv导入至VPC地址为selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com实例的test_db库的test_table表中。此处仅提供创建导入的curl指令示例,完整示例请参见导入数据的完整示例

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

返回结果说明

Stream load是一种同步导入方式,导入结果将通过创建导入的返回值直接返回。返回结果示例如下。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

返回结果参数说明如下。

参数名称

参数说明

TxnId

导入的事务ID。

Label

导入标识

您可自定义标识,也可通过系统自动生成。

Status

导入状态,取值如下:

  • Success:导入成功。

  • Publish Timeout导入任务已完成,但数据可能会有所延迟,无需重复尝试。

  • Label Already ExistsLabel重复,需更换Label

  • Fail:导入失败。

ExistingJobStatus

已存在的Label对应的导入作业的状态。

该字段仅在StatusLabel Already Exists时才会显示。

您通过此状态,判断已存在Label对应的导入任务的状态。

  • RUNNING:任务还在执行。

  • FINISHED:任务成功。

Message

错误信息提示。

NumberTotalRows

导入总处理的行数。

NumberLoadedRows

成功导入的行数。

NumberFilteredRows

数据质量不合格的行数。

NumberUnselectedRows

where条件过滤的行数。

LoadBytes

导入的字节数。

LoadTimeMs

导入耗时。

单位:毫秒。

BeginTxnTimeMs

FE请求开始一个事务所花费的时间。

单位:毫秒。

StreamLoadPutTimeMs

FE请求获取导入数据执行计划所花费的时间。

单位:毫秒。

ReadDataTimeMs

读取数据所花费的时间。

单位:毫秒。

WriteDataTimeMs

执行写入数据操作所花费的时间。

单位:毫秒。

CommitAndPublishTimeMs

FE请求提交并且发布事务所花费的时间。

单位:毫秒。

ErrorURL

如果有数据质量问题,可通过访问这个URL查看具体错误行。

取消导入任务

Stream Load导入任务创建后,无法手动取消。任务仅在出现超时或导入错误的情况下,由系统自动取消。您可根据返回结果中的errorUrl下载报错信息,进行错误排查。

查看Stream Load任务

如果您已开启Stream Load操作记录,您可以通过通过MySQL客户端连接云数据库SelectDB版实例,执行show stream load语句查看已经完成的Stream Load任务。

导入数据的完整示例

准备工作

开始导入操作前,您需完成准备工作

导入CSV数据

通过脚本导入示例

  1. 创建待导入数据的表。

    1. 连接SelectDB实例。具体操作,请参见通过DMS连接云数据库SelectDB版实例

    2. 执行建库语句。

      CREATE DATABASE test_db;
    3. 执行建表语句。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int,
          address varchar(50),
          url varchar(500)
      )
      UNIQUE KEY(`id`, `name`)
      DISTRIBUTED BY HASH(id) BUCKETS 16
      PROPERTIES("replication_num" = "1");
  2. 在发起Stream Load的终端所在设备,创建待导入文件test.csv

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. 导入数据。

    打开目标设备的终端,通过curl命令发起Stream Load任务,导入数据。

    创建导入任务的具体语法以及参数说明,请参见创建导入任务,以下为常见导入场景的示例。

    • 使用Label去重,指定超时时间。

      将文件test.csv中的数据导入到数据库test_db中的test_table表,使用Label避免导入重复批次的数据,并指定超时时间为100秒。

       curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 使用Label去重,并使用列筛选文件中要导入的数据。

      将文件test.csv中的数据导入到数据库test_db中的test_table表,使用Label避免导入重复批次的数据,指定文件的列名,并且只导入address等于hangzhou的数据。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 允许20%容错率。

      将文件test.csv中的数据导入到数据库test_db中的test_table表,允许20%的错误率。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 使用严格模式并设置时区。

      导入数据进行严格模式过滤,并设置时区为Africa/Abidjan

      curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 删除SelectDB中的数据

      删除SelectDBtest.csv文件中相同的数据。

      curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 根据条件删除文件中不需要导入的数据,剩余数据正常导入SelectDB

      test.csv文件中address列为hangzhou的数据的行删除,其他行正常导入至SelectDB

      curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load

通过Java代码导入示例

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. 参数配置
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. 构建httpclient,特别注意需要开启重定向(isRedirectable)
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // 开启重定向
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. 构建httpPut请求对象
        HttpPut httpPut = new HttpPut(loadUrl);

        // 设置httpHeader...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. 设置要发送的数据,这里写入csv
        // 假设有一张表,字段如下:
        // field1,field2,field3,field4
        // 这里模拟了三条csv记录,doris 中csv的行分隔符默认是\n,列分隔符默认是\t
        // String data =
        //        "1\t2\t3\t4\n" +
        //        "11\t22\t33\t44\n" +
        //        "111\t222\t333\t444";
        // 读取所有行
         List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
        // 用\n连接所有行
        String data = String.join("\n", lines);
        
        httpPut.setEntity(new StringEntity(data));

        // 5. 发送请求,处理结果
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // 选择适合的JSON序列化组件,对返回结果进行序列化
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // 获取SelectDB返回的状态码...
            String dorisStatus = respAsMap.get("Status");
            // SelectDB返回以下状态,都表示数据写入成功
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

导入JSON数据

  1. 创建待导入数据的表。

    1. 连接SelectDB实例。具体操作,请参见通过DMS连接云数据库SelectDB版实例

    2. 执行建库语句。

      CREATE DATABASE test_db;
    3. 执行建表语句。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int
      )
      UNIQUE KEY(`id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 16
      PROPERTIES("replication_num" = "1");

  2. 导入数据。

    导入非数组格式数据

    1. 在发起Stream Load的终端,创建json.data,文件包含多行,一行一个JSON记录。内容如下:

      {"id":1,"name":"Emily","age":25}
      {"id":2,"name":"Benjamin","age":35}
      {"id":3,"name":"Olivia","age":28}
      {"id":4,"name":"Alexander","age":60}
      {"id":5,"name":"Ava","age":17}
    2. 导入数据。

      打开设备终端,通过curl命令发起Stream Load任务,将文件json.data中的数据导入到数据库test_dbtest_table表中。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

    导入数组格式数据

    1. 在发起Stream Load的终端,创建JSON数组格式的数据文件json_array.data

      [
      {"userid":1,"username":"Emily","userage":25},
      {"userid":2,"username":"Benjamin","userage":35},
      {"userid":3,"username":"Olivia","userage":28},
      {"userid":4,"username":"Alexander","userage":60},
      {"userid":5,"username":"Ava","userage":17}
      ]
    2. 导入数据。

      打开设备的终端,通过curl命令发起Stream Load任务,将本地文件json_array.data中的数据导入到数据库test_dbtest_table表中。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Http Stream模式

Stream Load中,依托Table Value Function(TVF)功能,可以通过使用SQL表达式来表达导入的参数。这个Stream Load依托TVF功能后名为http_stream。更多Table Value Function(TVF)的使用方式,详情请参见TVF

使用http_stream进行Stream Load导入时的Rest API URL不同于Stream Load普通导入的 URL。

  • 普通Stream LoadURL为:http://host:http_port/api/{db}/{table}/_stream_load

  • 使用TVF http_streamURL 为:http://host:http_port/api/_http_stream

语法

Stream LoadHttp Stream模式。

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

Http Stream参数说明请参见参数说明

使用示例

Http Header中添加一个SQL的参数load_sql,去替代之前参数中的column_separatorline_delimiterwherecolumns等参数,SQL参数load_sql示例如下。

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

完整示例:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream

常见问题

Q:导入过程中,报get table cloud commit lock timeout怎么办?

A:由于您写入数据频率太快,导致表死锁。强烈建议您降低写入频率,对数据进行攒批处理。单次Stream Load可以写入几百MB1GB的数据。