云原生多模数据库 Lindorm支持Bulkload(批量快速导入数据)功能,可以更快更稳定的导入数据。本文介绍批量快速导入数据操作。
功能特性
批量快速导入数据功能支持数据文件旁路加载,不需要经过数据API写入链路并且不需要占用实例计算资源,,批量快速导入数据与通过API导入数据相比有以下优势:
导入数据更快,速度可以提升10倍以上。
在线服务更稳定,不占用在线服务资源。
资源使用更灵活,在离线资源中分开使用更加灵活。
支持多种数据源导入,包括CSV、ORC、Parquet、MaxCompute等。
使用简单。您无需开发任何代码就可以实现数据的批量快速旁路加载。
成本低。LTS Bulkload基于Serverless Spark提供的云原生弹性能力,根据您的需求提供弹性计算资源按量收费,您无需经常配置计算资源,可以降低使用成本。
前提条件
已开通并登录LTS数据同步服务,具体操作请参见开通并登录LTS。
已开通Lindorm计算引擎,具体操作请参见开通与变配。
已添加Spark数据源,具体操作请参见添加Spark数据源。
支持的数据源
源数据源 | 目标数据源 |
源数据源 | 目标数据源 |
MaxCompute Table | Lindorm宽表引擎 |
HDFS CSV或者OSS CSV | |
HDFS Parquet或者OSS Parquet | |
HDFS ORC或者OSS ORC |
提交方式
快速导入数据任务支持以下方式提交。
通过LTS操作页面提交
登录LTS操作页面,具体操作参见开通并登录LTS。
在左侧导航栏,选择数据源管理 > 添加数据源,添加以下数据源。
添加ODPS数据源,具体操作请参见ODPS数据源。
添加Lindorm宽表数据源,具体操作请参见Lindorm宽表数据源。
添加HDFS数据源,具体操作请参见添加HDFS数据源。
在左侧导航栏中,选择。
单击创建任务,配置以下参数。
配置项
参数
描述
配置项
参数
描述
选择数据源
源数据源
选择添加的ODPS数据源或者HDFS数据源。
目标数据源
选择添加的Lindorm宽表数据源。
插件配置
Reader配置
如果源数据源为ODPS数据源,读插件配置说明如下:
table:ODPS的表名。
column:需要导入的ODPS列名。
partition:非分区表为空,分区表配置分区信息。
numPartitions:读取时的并发度。
如果源数据源为HDFS数据源,CSV文件的读插件配置说明如下:
filePath:CSV文件的所在目录。
header:CSV文件是否包含header行。
delimite:CSV文件分隔符。
column:CSV文件中列名以及对应类型。
如果源数据源为HDFS数据源,Parquet文件的读插件配置说明如下:
filePath:Parquet文件的所在目录。
column:Parquet文件中的列名。
读插件配置示例请参见配置示例。
Writer配置
namespace:Lindorm宽表的namespace。
lindormTable:Lindorm宽表名称。
compression:压缩算法,目前仅支持zstd,不使用压缩算法配置为none。
columns:根据导入至目标表类型填写。
如果导入至Lindorm宽表,columns需要配置Lindorm SQL宽表的列名,和读配置中的column对应。
如果导入至Lindorm兼容HBase表,columns需要配置HBase表标准的列名,和读配置中的column对应。
timestamp:数据在Lindorm宽表中的时间戳,支持以下类型:
时间戳列为Long类型,值为13位。
时间戳列为String类型,格式为yyyy-MM-dd HH:mm:ss或者yyyy-MM-dd HH:mm:ss SSS。
写插件配置示例请参见配置示例。
作业运行参数配置
Spark Driver规格
选择Spark Driver规格。
Spark Executor规格
选择Spark Executor规格。
Executor数量
输入Executor的数量。
spark配置
输入Spark配置,可不填写。
单击创建。
在Bulkload页面单击任务名查看快速导入数据任务。
单击任务名,可以查看Spark任务的UI界面。
单击详情,可以查看Spark任务的执行日志。
源数据源迁移到Lindorm宽表,在Lindorm宽表中不同分区的数据分布均匀的情况下,数据容量为100 GB(压缩比为1:4)压缩后导入大概需要1个小时,根据具体情况不同可能会有区别。
配置示例
源数据源为ODPS的读插件配置示例。
{ "table": "test", "column": [ "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ], "partition": [ "pt=1" ], "numPartitions":10 }
源数据源为HDFS,CSV文件的读插件配置示例。
{ "filePath":"csv/", "header": false, "delimiter": ",", "column": [ "id|string", "intcol|int", "doublecol|double", "stringcol|string", "string1col|string", "decimalcol|decimal" ] }
源数据源为HDFS,Parquet文件的读插件配置示例。
{ "filePath":"parquet/", "column": [ //parquet文件中的列名 "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ] }
导入到Lindorm SQL表格的写插件配置示例。
{ "namespace": "default", "lindormTable": "xxx", "compression":"zstd", "timestamp":"2022-07-01 10:00:00", "columns": [ "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ] }
导入到Lindorm兼容HBase表格的写插件配置示例。
{ "namespace": "default", "lindormTable": "xxx", "compression":"zstd", "timestamp":"2022-07-01 10:00:00", "columns": [ "ROW||String", //ROW代表rowkey,String表示类型 "f:intcol||Int", //格式为列簇:列名||列类型 "f:doublecol||Double", "f:stringcol||String", "f:string1col||String", "f:decimalcol||Decimal" ] }
通过API接口提交
提交任务接口
接口(POST):
http://{BDSMaster}:12311/pro/proc/bulkload/create
,BDSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。参数说明:
示例:
curl -d "src=hdfs&dst=ld&readerConfig={\"filePath\":\"parquet/\",\"column\":[\"id\",\"intcol\",\"doublecol\",\"stringcol\",\"string1col\",\"decimalcol\"]}&writerConfig={\"columns\":[\"ROW||String\",\"f:intcol||Int\",\"f:doublecol||Double\",\"f:stringcol||String\",\"f:string1col||String\",\"f:decimalcol||Decimal\"],\"namespace\":\"default\",\"lindormTable\":\"bulkload_test\",\"compression\":\"zstd\"}&driverSpec=large&instances=5&fileType=Parquet" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{LTSMaster}:12311/pro/proc/bulkload/create
返回内容如下,message为任务ID。
{"success":"true","message":"proc-91-ff383c616e5242888b398e51359c****"}
获取任务信息
接口(GET):
http://{LTSMaster}:12311/pro/proc/{procId}/info
,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。参数说明:procId表示任务ID。
示例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/info
返回内容如下:
{ "data":{ "checkJobs":Array, "procId":"proc-91-ff383c616e5242888b398e51359c****", //任务ID "incrJobs":Array, "procConfig":Object, "stage":"WAIT_FOR_SUCCESS", "fullJobs":Array, "mergeJobs":Array, "srcDS":"hdfs", //源数据源 "sinkDS":"ld-uf6el41jkba96****", //目标数据源 "state":"RUNNING", //任务状态 "schemaJob":Object, "procType":"SPARK_BULKLOAD" //任务类型 }, "success":"true" }
终止任务
接口(GET):
http://{LTSMaster}:12311/pro/proc/{procId}/abort
,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。参数说明:procId表示任务ID。
示例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/abort
返回内容如下:
{"success":"true","message":"ok"}
重试任务
接口(GET):
http://{LTSMaster}:12311/pro/proc/{procId}/retry
,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。参数说明:procId表示任务ID。
示例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/retry
返回结果如下:
{"success":"true","message":"ok"}
删除任务
接口(GET):
http://{LTSMaster}:12311/pro/proc/{procId}/delete
,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。参数说明:procId表示任务ID。
示例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/delete
返回结果如下:
{"success":"true","message":"ok"}
通过Dolphin提交(公测中)
添加数据源
在使用Dolphin前,您需要添加计算引擎数据源(Spark数据源)、Lindorm宽表数据源和源端数据源(ODPS数据源或HDFS数据源)。
登录Dolphin
创建项目
在顶部菜单栏,单击项目管理。
在项目管理页面,单击创建项目,根据页面提示输入相关参数。
单击确定,然后单击项目名称,进入项目管理页。
创建工作流并提交任务
在左侧导航栏选择工作流定义。
单击创建工作流。
拖拽通用组件中的SHELL组件至右侧画布,编辑组件。
填写节点名称,并在脚本文本框中,输入提交任务的代码。
示例如下:
#!/bin/bash #提交任务的LTS URL,请替换为LTS实例ID MASTER_URL="http://ld-bp1i13ko5e222****-master3-001.lindorm.rds.aliyuncs.com:12311/pro/proc/" # 定义提交作业的API地址 SUBMIT_URL="${MASTER_URL}bulkload/create" echo "Submit URL: $SUBMIT_URL" # 定义作业提交的数据 # reader config,这里以源端为Maxcompute为例 READERCONFIG='{ "partition":[ "pt=20250212" ], "numPartitions":20, "column":[ "rowkey", "stringcol1", "longcol1", "doublecol1" ], "table":"test_table_10c" }' # writer config WRITERCONFIG='{ "columns":[ "rowkey", "stringcol1", "longcol1", "doublecol1" ], "namespace":"default", "lindormTable":"test_table_10c_mig", "compression":"zstd" }' # 任务启动的计算引擎Executor数量 INSTANCE='5' # SRC为ODPS数据源的名称,请先在LTS页面中手动添加。DST请填写Lindorm宽表数据源名称。 SRC='odps' DST='ld-bp1i13ko5e222****' # 计算引擎Executor和Driver规格 DRIVERSPEC='large' EXECUTORSPEC='xlarge' # 源文件类型。如果是Maxcompute可以不使用这个参数 FILETTYPE='Parquet' create_payload() { local result="" local first=true for arg in "$@"; do if [[ "$first" == true ]]; then result="$arg" first=false else result="$result&$arg" fi done echo "$result" } PAYLOAD=$(create_payload "src=$SRC" "dst=$DST" "readerConfig=$READERCONFIG" "writerConfig=$WRITERCONFIG" "instance=$INSTANCE" "driverSpec=$DRIVERSPEC" "executorSpec=$EXECUTORSPEC" "fileType=$FILETTYPE") echo "PAYLOAD: $PAYLOAD" # 提交作业 submit_response=$(curl -d "$PAYLOAD" -H "Content-Type: application/x-www-form-urlencoded" -X POST "$SUBMIT_URL") echo $submit_response # 解析提交响应以获取success和message success=$(echo $submit_response | grep -o '"success":"[^"]*' | cut -d'"' -f4) message=$(echo $submit_response | grep -o '"message":"[^"]*' | cut -d'"' -f4) # 检查提交作业是否成功 if [[ "$success" != "true" ]]; then echo "Job submission failed: $message" exit 1 fi # 解析提交响应以获取jobID job_id=$message echo "JobId: $job_id" STATUS_URL="${MASTER_URL}${job_id}/info" echo $STATUS_URL # 检查作业状态 while true; do # 获取作业状态 status_response=$(curl --silent --request GET "$STATUS_URL") # 解析状态响应以获取state state=$(echo $status_response | sed 's/.*"state":"\([^"]*\)".*/\1/') # 打印当前状态 echo "Current job state: $state" # 判断是否完成 if [[ $state == "SUCCESS" ]]; then echo "Job completed successfully." exit 0 elif [[ $state == "FAILED" ]]; then echo "Job failed." exit 1 elif [[ $state == "KILLED" ]]; then echo "Job killed." exit 1 fi # 暂停60秒后再次检查状态 sleep 60 done
- 本页导读 (1)
- 功能特性
- 前提条件
- 支持的数据源
- 提交方式
- 通过LTS操作页面提交
- 通过API接口提交
- 提交任务接口
- 获取任务信息
- 终止任务
- 重试任务
- 删除任务
- 通过Dolphin提交(公测中)
- 添加数据源
- 登录Dolphin
- 创建项目
- 创建工作流并提交任务