当您需要将本地文件或数据流导入到云数据库 SelectDB 版实例时,可以使用Stream Load进行数据导入。本文介绍如何通过Stream Load导入数据至云数据库 SelectDB 版。
背景信息
Stream Load属于同步接口的导入方式,您可以通过发送HTTP请求将本地文件或数据流导入到云数据库 SelectDB 版。Stream Load执行后会立即返回导入结果,您可以通过请求的返回结果判断此次导入是否成功。其支持的数据格式有CSV(文本)、JSON、PARQUET和ORC。
由于Stream load具有高吞吐、低延迟以及灵活可靠的特性,强烈建议您将Stream Load作为主要的数据导入方式。
准备工作
- 确保发起Stream Load请求的终端与SelectDB网络互通: - 为云数据库 SelectDB 版实例申请公网地址。具体操作,请参见申请和释放公网地址。 - 如果您发起Stream Load请求的终端与云数据库 SelectDB 版实例位于同一VPC下,跳过此步骤。 
- 将发起Stream Load请求终端的相关IP添加至云数据库 SelectDB 版的白名单。具体操作,请参见设置白名单。 
- 若发起Stream Load请求终端存在白名单机制,已将SelectDB实例所在网段IP添加至源集群的白名单中。 - 获取SelectDB实例VPC地址的IP,请参见如何查看云数据库 SelectDB 版实例所属VPC的IP网段? 
- 获取SelectDB实例公网的IP地址,通过 - ping命令访问SelectDB实例的公网地址,获取其对应的 IP 地址。
 
 
- (可选)修改计算集群(BackEnd)配置,开启Stream Load操作记录。 - 默认情况下,计算集群不保留Stream Load操作记录。 - 如果您需要跟踪Stream Load的操作情况,需在创建导入任务之前配置enable_stream_load_record=true并重启集群,以启用Stream Load操作记录。如需开启此功能,请提交工单以获得技术支持。 
- (可选)修改计算集群配置,调整Stream load的最大导入限制。 - 默认情况下,Stream Load导入文件的最大限制为10240MB。 - 如果您的原始文件超过此值,则需调整后端参数streaming_load_max_mb。修改参数,请参见参数配置。 
- (可选)修改FE配置,调整导入超时时间。 - 默认情况下,Stream Load的导入任务超时时间为600秒。如果导入任务在设定的超时时间内未完成,系统将取消该任务,并将其状态更改为CANCELLED。 - 如果导入的源文件无法在规定时间内完成,您可以在Stream Load请求中设置单独的超时时间,或者调整FE参数stream_load_default_timeout_second并重启实例以设定全局的默认超时时间。如需调整,请提交工单以获得技术支持。 
注意事项
单次Stream Load可以写入几百MB至1GB的数据。在业务场景中,频繁的写入少量数据可能导致实例性能大幅下降,甚至数据库表死锁。强烈建议您降低写入频率,对数据进行攒批处理。
- 业务端攒批:您需自行收集业务数据,然后向SelectDB发起Stream Load请求。 
- 服务端攒批:SelectDB接收到Stream Load请求后,服务端将进行请求数据的批处理操作。如何操作,请参见Group Commit。 
创建导入任务
Stream Load通过HTTP协议提交和传输数据。以下为通过curl命令提交导入示例,该命令可在Linux或macOS系统的终端或Windows系统的命令提示符下执行。此外,Stream Load还支持通过其他HTTP客户端进行操作。
语法
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load参数说明
| 参数名称 | 是否必选 | 参数说明 | 
| 
 | 是 | 需要认证时,会将 | 
| 
 | 是 | 指定SelectDB实例的用户名和密码。 
 | 
| 
 | 否 | 指定本次Stream Load导入请求的请求头(Header)内容。格式如下: 
 常见参数如下: 
 更多请求头参数,请参见请求头参数说明。 | 
| 
 | 是 | 指定需要导入数据的文件路径。 file_path:目标文件路径。 | 
| 
 | 是 | HTTP请求的Method,采用PUT请求方法,指定SelectDB的数据导入地址,具体参数如下: 
 | 
请求头参数说明
Stream Load使用HTTP协议,因此导入任务有关的参数主要设置在请求头(Header)中。常用的导入参数如下。
| 参数名称 | 参数说明 | 
| 
 | 导入任务的唯一标识。 
 
 重要  推荐同一批次数据使用相同的 | 
| 
 | 指定导入数据格式。 
 各类文件的格式要求以及一些相关参数使用,请参见文件格式。 | 
| 
 | 指定导入文件中的换行符。 您也可以使用多个字符的组合作为换行符。例如,在Windows系统中,使用\r\n作为换行符。 
 | 
| 
 | 指定导入文件中的列分隔符。 您也可使用多个字符的组合作为列分隔符。例如,可以使用双竖线 如果是不可见字符,则需要加 
 | 
| 
 | 指定文件的压缩格式。仅支持 支持的压缩格式: | 
| 
 | 指定导入任务的最大容忍率。 当导入的错误率超过该阈值时,导入将失败。如需忽略错误行,必须将该参数设置为大于0,以确保导入成功。 
 | 
| 
 | 指定是否开启严格过滤模式。 
 | 
| 
 | 指定导入使用的集群。 默认为该实例的默认集群。如果该实例没有设置默认集群,则会自动选择一个有权限的集群。 | 
| 
 | 指定是否仅将数据导入到对应分区的一个tablet。该参数仅允许在对具有random分桶的Duplicate表进行数据导入时设置。 
 | 
| 
 | 指定导入任务的过滤条件。 支持对原始数据指定 
 | 
| 
 | 指定待导入数据的分区(Partition)信息。 如果待导入数据不属于指定的分区(Partition)则不会被导入。这些数据将计入dpp.abnorm.ALL。 
 | 
| 
 | 指定待导入数据的函数变换配置。 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 | 
| 
 | 指定数据合并类型。 
 重要  
 | 
| 
 | 仅在指定 | 
| 
 | 仅适用于Unique模型,相同Key列下,保证Value列按照source_sequence列进行REPLACE,source_sequence可以是数据源中的列,也可以是表结构中的一列。 | 
| 
 | 指定导入内存限制。 
 | 
| 
 | 指定导入的超时时间。 
 | 
| 
 | 指定本次导入所使用的时区。该参数会影响所有导入涉及的和时区有关的函数结果。有关时区,您可通过IANA时区数据库查看。 默认值: | 
| 
 | 指定是否开启两阶段事务提交模式。 
 | 
| jsonpaths | 导入JSON数据格式有两种方式: 
 | 
| json_root | 
 默认值:"",代表选择整个JSON作为导入解析的根节点。 | 
| read_json_by_line | Stream Load 中处理 JSON 格式数据的一个重要参数,它控制着如何解析包含多行JSON数据的输入文件。 
 | 
| strip_outer_array | Stream Load 中处理JSON格式数据的一个重要参数,它控制着如何解析包含外层数组的JSON数据。 
 重要  当需要导入JSON格式的数据时,非数组格式的性能大幅高于数组格式。 | 
示例
将CSV格式的文件data.csv导入至VPC地址为selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com实例的test_db库的test_table表中。此处仅提供创建导入的curl指令示例,完整示例请参见导入数据的完整示例。
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load返回结果说明
Stream load是一种同步导入方式,导入结果将通过创建导入的返回值直接返回。返回结果示例如下。
{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}返回结果参数说明如下。
| 参数名称 | 参数说明 | 
| 
 | 导入的事务ID。 | 
| 
 | 导入标识。 您可自定义标识,也可通过系统自动生成。 | 
| 
 | 导入状态,取值如下: 
 | 
| 
 | 已存在的 该字段仅在Status为 您通过此状态,判断已存在Label对应的导入任务的状态。 
 | 
| 
 | 错误信息提示。 | 
| 
 | 导入总处理的行数。 | 
| 
 | 成功导入的行数。 | 
| 
 | 数据质量不合格的行数。 | 
| 
 | 被 | 
| 
 | 导入的字节数。 | 
| 
 | 导入耗时。 单位:毫秒。 | 
| 
 | 向FE请求开始一个事务所花费的时间。 单位:毫秒。 | 
| 
 | 向FE请求获取导入数据执行计划所花费的时间。 单位:毫秒。 | 
| 
 | 读取数据所花费的时间。 单位:毫秒。 | 
| 
 | 执行写入数据操作所花费的时间。 单位:毫秒。 | 
| 
 | 向FE请求提交并且发布事务所花费的时间。 单位:毫秒。 | 
| 
 | 如果有数据质量问题,可通过访问这个URL查看具体错误行。 | 
取消导入任务
Stream Load导入任务创建后,无法手动取消。任务仅在出现超时或导入错误的情况下,由系统自动取消。您可根据返回结果中的errorUrl下载报错信息,进行错误排查。
查看Stream Load任务
如果您已开启Stream Load操作记录,您可以通过MySQL客户端连接云数据库SelectDB版实例,执行show stream load语句查看已经完成的Stream Load任务。
导入数据的完整示例
准备工作
开始导入操作前,您需完成准备工作。
导入CSV数据
通过脚本导入示例
- 创建待导入数据的表。 - 连接SelectDB实例。具体操作,请参见通过DMS连接云数据库SelectDB版实例。 
- 执行建库语句。 - CREATE DATABASE test_db;
- 执行建表语句。 - CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
 
- 在发起Stream Load的终端所在设备,创建待导入文件 - test.csv。- 1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.com
- 导入数据。 - 打开目标设备的终端,通过curl命令发起Stream Load任务,导入数据。 - 创建导入任务的具体语法以及参数说明,请参见创建导入任务,以下为常见导入场景的示例。 - 使用 - Label去重,指定超时时间。- 将文件 - test.csv中的数据导入到数据库- test_db中的- test_table表,使用Label避免导入重复批次的数据,并指定超时时间为100秒。- curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
- 使用 - Label去重,并使用列筛选文件中要导入的数据。- 将文件 - test.csv中的数据导入到数据库- test_db中的- test_table表,使用Label避免导入重复批次的数据,指定文件的列名,并且只导入address等于hangzhou的数据。- curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
- 允许20%容错率。 - 将文件 - test.csv中的数据导入到数据库- test_db中的- test_table表,允许20%的错误率。- curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
- 使用严格模式并设置时区。 - 导入数据进行严格模式过滤,并设置时区为 - Africa/Abidjan。- curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
- 删除SelectDB中的数据。 - 删除SelectDB中与test.csv文件中相同的数据。 - curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
- 根据条件删除文件中不需要导入的数据,剩余数据正常导入SelectDB。 - 将test.csv文件中address列为hangzhou的数据的行删除,其他行正常导入至SelectDB。 - curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load
 
通过Java代码导入示例
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
    public static void main(String[] args) throws Exception {
        // 1. 参数配置
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";
        // 2. 构建httpclient,特别注意需要开启重定向(isRedirectable)
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // 开启重定向
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();
        // 3. 构建httpPut请求对象
        HttpPut httpPut = new HttpPut(loadUrl);
        // 设置httpHeader...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);
        // 4. 设置要发送的数据,这里写入csv
        // 假设有一张表,字段如下:
        // field1,field2,field3,field4
        // 这里模拟了三条csv记录,doris 中csv的行分隔符默认是\n,列分隔符默认是\t
        // String data =
        //        "1\t2\t3\t4\n" +
        //        "11\t22\t33\t44\n" +
        //        "111\t222\t333\t444";
        // 读取所有行
         List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
        // 用\n连接所有行
        String data = String.join("\n", lines);
        
        httpPut.setEntity(new StringEntity(data));
        // 5. 发送请求,处理结果
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();
        if (httpStatus == HttpStatus.SC_OK) {
            // 选择适合的JSON序列化组件,对返回结果进行序列化
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // 获取SelectDB返回的状态码...
            String dorisStatus = respAsMap.get("Status");
            // SelectDB返回以下状态,都表示数据写入成功
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}导入JSON数据
- 创建待导入数据的表。 - 连接SelectDB实例。具体操作,请参见通过DMS连接云数据库SelectDB版实例。 
- 执行建库语句。 - CREATE DATABASE test_db;
- 执行建表语句。 - CREATE TABLE test_table ( id int, name varchar(50), age int ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES("replication_num" = "1");
 
- 导入数据。 重要- 当需要导入JSON格式的数据时,非数组格式的性能大幅高于数组格式。 - 导入非数组格式数据- 在发起Stream Load的终端,创建 - json.data,文件包含多行,一行一个JSON记录。内容如下:- {"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}
- 导入数据。 - 打开设备终端,通过curl命令发起Stream Load任务,将文件 - json.data中的数据导入到数据库- test_db的- test_table表中。- curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
 - 导入数组格式数据- 在发起Stream Load的终端,创建JSON数组格式的数据文件 - json_array.data。- [ {"userid":1,"username":"Emily","userage":25}, {"userid":2,"username":"Benjamin","userage":35}, {"userid":3,"username":"Olivia","userage":28}, {"userid":4,"username":"Alexander","userage":60}, {"userid":5,"username":"Ava","userage":17} ]
- 导入数据。 - 打开设备的终端,通过curl命令发起Stream Load任务,将本地文件 - json_array.data中的数据导入到数据库- test_db的- test_table表中。- curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
 
Http Stream模式
在Stream Load中,依托Table Value Function(TVF)功能,可以通过使用SQL表达式来表达导入的参数。这个Stream Load依托TVF功能后名为http_stream。更多Table Value Function(TVF)的使用方式,详情请参见TVF。
使用http_stream进行Stream Load导入时的Rest API URL不同于Stream Load普通导入的 URL。
- 普通Stream Load的URL为: - http://host:http_port/api/{db}/{table}/_stream_load。
- 使用TVF http_stream的URL 为: - http://host:http_port/api/_http_stream。
语法
Stream Load的Http Stream模式。
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_streamHttp Stream参数说明请参见参数说明。
使用示例
在Http Header中添加一个SQL的参数load_sql,去替代之前参数中的column_separator、line_delimiter、where、columns等参数,SQL参数load_sql示例如下。
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");完整示例:
curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream常见问题
Q1:导入过程中,报get table cloud commit lock timeout怎么办?
由于您写入数据频率太快,导致表死锁。强烈建议您降低写入频率,对数据进行攒批处理。单次Stream Load可以写入几百MB至1GB的数据。
Q2:导入CSV文件格式时,数据中存在列分隔符和行分隔符应该如何处理?
您需重新指定列分隔符和行分隔符,并修改导入数据文本,确保数据与分割符不冲突,使得数据能被正常解析。示例如下:
存在行分隔符
如果导入数据中包含已指定的换行符,例如默认的换行符\n,则需重新指定换行符。
例如,您的数据文件为:
张三\n,25,陕西
李四\n,30,北京此场景中,文件中的\n为数据而非换行符,但该文件的默认换行符也是\n,如需文件能被正常解析,您需通过line_delimiter指定换行符,数据文本每行数据行末也需显示写换行符。示例如下:
- 设置导入行换行符。 - 例如,您将默认的换行符 - \n替换为- \r\n,则导入数据时,您需设置- -H "line_delimiter:\r\n"。
- 为导入数据行末添加指定的换行符。上述示例文本则需修改为: - 张三\n,25,陕西\r\n 李四\n,30,北京\r\n
存在列分隔符
如果导入数据中包含已指定的列分隔符,例如默认的列分割符\t,则需重新指定列分割符。
例如,您的数据文件为:
张三\t  25  陕西
李四\t  30  北京此场景中,文件中的\t为数据而非列分隔符,但该文件默认使用的列分隔符也是\t(制表符),如需文件能被正常解析,您需通过column_separator重新指定列分隔符,数据文本每行数据也需显示增加列分隔符。示例如下:
- 设置导入列分割符。 - 例如,您将默认的列分割符 - \t替换为逗号- ,,则导入数据时,您需设置- -H "column_separator:,"。
- 为导入数据列添加指定的列分隔符。上述示例文本则为: - 张三\t,25,陕西 李四\t,30,北京