通用批量导入服务

更新时间:2025-04-07 01:45:43

云原生多模数据库 Lindorm支持Bulkload(批量快速导入数据)功能,可以更快更稳定的导入数据。本文介绍批量快速导入数据操作。

功能特性

批量快速导入数据功能支持数据文件旁路加载,不需要经过数据API写入链路并且不需要占用实例计算资源,,批量快速导入数据与通过API导入数据相比有以下优势:

  • 导入数据更快,速度可以提升10倍以上。

  • 在线服务更稳定,不占用在线服务资源。

  • 资源使用更灵活,在离线资源中分开使用更加灵活。

  • 支持多种数据源导入,包括CSV、ORC、Parquet、MaxCompute等。

  • 使用简单。您无需开发任何代码就可以实现数据的批量快速旁路加载。

  • 成本低。LTS Bulkload基于Serverless Spark提供的云原生弹性能力,根据您的需求提供弹性计算资源按量收费,您无需经常配置计算资源,可以降低使用成本。

前提条件

支持的数据源

源数据源

目标数据源

源数据源

目标数据源

MaxCompute Table

Lindorm宽表引擎

HDFS CSV或者OSS CSV

HDFS Parquet或者OSS Parquet

HDFS ORC或者OSS ORC

提交方式

快速导入数据任务支持以下方式提交。

通过LTS操作页面提交

  1. 登录LTS操作页面,具体操作参见开通并登录LTS

  2. 在左侧导航栏,选择数据源管理 > 添加数据源,添加以下数据源。

  3. 在左侧导航栏中,选择批量导入数据

  4. 单击创建任务,配置以下参数。

    配置项

    参数

    描述

    配置项

    参数

    描述

    选择数据源

    源数据源

    选择添加的ODPS数据源或者HDFS数据源。

    目标数据源

    选择添加的Lindorm宽表数据源。

    插件配置

    Reader配置

    • 如果源数据源为ODPS数据源,读插件配置说明如下:

      • table:ODPS的表名。

      • column:需要导入的ODPS列名。

      • partition:非分区表为空,分区表配置分区信息。

      • numPartitions:读取时的并发度。

    • 如果源数据源为HDFS数据源,CSV文件的读插件配置说明如下:

      • filePath:CSV文件的所在目录。

      • header:CSV文件是否包含header行。

      • delimite:CSV文件分隔符。

      • column:CSV文件中列名以及对应类型。

    • 如果源数据源为HDFS数据源,Parquet文件的读插件配置说明如下:

      • filePath:Parquet文件的所在目录。

      • column:Parquet文件中的列名。

    说明

    读插件配置示例请参见配置示例

    Writer配置

    • namespace:Lindorm宽表的namespace。

    • lindormTable:Lindorm宽表名称。

    • compression:压缩算法,目前仅支持zstd,不使用压缩算法配置为none。

    • columns:根据导入至目标表类型填写。

      • 如果导入至Lindorm宽表,columns需要配置Lindorm SQL宽表的列名,和读配置中的column对应。

      • 如果导入至Lindorm兼容HBase表,columns需要配置HBase表标准的列名,和读配置中的column对应。

    • timestamp:数据在Lindorm宽表中的时间戳,支持以下类型:

      • 时间戳列为Long类型,值为13位。

      • 时间戳列为String类型,格式为yyyy-MM-dd HH:mm:ss或者yyyy-MM-dd HH:mm:ss SSS。

    说明

    写插件配置示例请参见配置示例

    作业运行参数配置

    Spark Driver规格

    选择Spark Driver规格。

    Spark Executor规格

    选择Spark Executor规格。

    Executor数量

    输入Executor的数量。

    spark配置

    输入Spark配置,可不填写。

  5. 单击创建

  6. Bulkload页面单击任务名查看快速导入数据任务。

    • 单击任务名,可以查看Spark任务的UI界面。

    • 单击详情,可以查看Spark任务的执行日志。

    任务详情页

    说明

    源数据源迁移到Lindorm宽表,在Lindorm宽表中不同分区的数据分布均匀的情况下,数据容量为100 GB(压缩比为1:4)压缩后导入大概需要1个小时,根据具体情况不同可能会有区别。

配置示例

读插件配置示例
写插件配置示例
  • 源数据源为ODPS的读插件配置示例。

    {
      "table": "test",
      "column": [ 
        "id",
        "intcol",
        "doublecol",
        "stringcol",
        "string1col",
        "decimalcol"
      ],
      "partition": [
        "pt=1" 
      ],
      "numPartitions":10 
    }
  • 源数据源为HDFS,CSV文件的读插件配置示例。

    {
      "filePath":"csv/",
      "header": false,
      "delimiter": ",",
      "column": [
        "id|string",
        "intcol|int",
        "doublecol|double",
        "stringcol|string",
        "string1col|string",
        "decimalcol|decimal"
      ]
    }
  • 源数据源为HDFS,Parquet文件的读插件配置示例。

    {
      "filePath":"parquet/",
      "column": [   //parquet文件中的列名
        "id",
        "intcol",
        "doublecol",
        "stringcol",
        "string1col",
        "decimalcol"
      ]
    }
  • 导入到Lindorm SQL表格的写插件配置示例。

    {
      "namespace": "default",
      "lindormTable": "xxx",
      "compression":"zstd",
      "timestamp":"2022-07-01 10:00:00",
      "columns": [
           "id",
           "intcol",
           "doublecol",
           "stringcol",
            "string1col",
            "decimalcol"
      ]
    }
  • 导入到Lindorm兼容HBase表格的写插件配置示例。

    {
      "namespace": "default",
      "lindormTable": "xxx",
      "compression":"zstd",
      "timestamp":"2022-07-01 10:00:00",
      "columns": [
        "ROW||String",    //ROW代表rowkey,String表示类型
        "f:intcol||Int",  //格式为列簇:列名||列类型
        "f:doublecol||Double",
        "f:stringcol||String",
        "f:string1col||String",
        "f:decimalcol||Decimal"
      ]
    }

通过API接口提交

提交任务接口

  • 接口(POST):http://{BDSMaster}:12311/pro/proc/bulkload/create,BDSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。获取页面

  • 参数说明:

    参数

    说明

    参数

    说明

    src

    源数据源名称。

    dst

    目标数据源名称。

    readerConfig

    读插件配置信息,文件类型为JSON,读插件配置示例请参见配置示例

    writerConfig

    写插件配置信息,文件类型为JSON,写插件配置示例请参见配置示例

    driverSpec

    Driver的规格,包括small、medium、largexlarge四种规格,推荐配置为large。

    instances

    Executor的数量。

    fileType

    如果源数据源为HDFS,需要填写文件类型为CSV或者Parquet。

    sparkAdditionalParams

    扩展参数可以不填。

  • 示例:

    curl -d "src=hdfs&dst=ld&readerConfig={\"filePath\":\"parquet/\",\"column\":[\"id\",\"intcol\",\"doublecol\",\"stringcol\",\"string1col\",\"decimalcol\"]}&writerConfig={\"columns\":[\"ROW||String\",\"f:intcol||Int\",\"f:doublecol||Double\",\"f:stringcol||String\",\"f:string1col||String\",\"f:decimalcol||Decimal\"],\"namespace\":\"default\",\"lindormTable\":\"bulkload_test\",\"compression\":\"zstd\"}&driverSpec=large&instances=5&fileType=Parquet" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{LTSMaster}:12311/pro/proc/bulkload/create

    返回内容如下,message为任务ID。

    {"success":"true","message":"proc-91-ff383c616e5242888b398e51359c****"}

获取任务信息

  • 接口(GET):http://{LTSMaster}:12311/pro/proc/{procId}/info,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。

  • 参数说明:procId表示任务ID。

  • 示例:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/info

    返回内容如下:

    {
        "data":{
            "checkJobs":Array,
            "procId":"proc-91-ff383c616e5242888b398e51359c****",  //任务ID
            "incrJobs":Array,
            "procConfig":Object,
            "stage":"WAIT_FOR_SUCCESS",
            "fullJobs":Array,
            "mergeJobs":Array,
            "srcDS":"hdfs",    //源数据源
            "sinkDS":"ld-uf6el41jkba96****",  //目标数据源
            "state":"RUNNING",   //任务状态
            "schemaJob":Object,   
            "procType":"SPARK_BULKLOAD"   //任务类型
        },
        "success":"true"
    }

终止任务

  • 接口(GET):http://{LTSMaster}:12311/pro/proc/{procId}/abort,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。

  • 参数说明:procId表示任务ID。

  • 示例:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/abort

    返回内容如下:

    {"success":"true","message":"ok"}

重试任务

  • 接口(GET):http://{LTSMaster}:12311/pro/proc/{procId}/retry,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。

  • 参数说明:procId表示任务ID。

  • 示例:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/retry

    返回结果如下:

    {"success":"true","message":"ok"}

删除任务

  • 接口(GET):http://{LTSMaster}:12311/pro/proc/{procId}/delete,LTSMaster需要修改为Lindorm实例的Master hostname,可以登录Lindorm实例的LTS,在集群信息页面的基本信息区域获取。

  • 参数说明:procId表示任务ID。

  • 示例:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/delete

    返回结果如下:

    {"success":"true","message":"ok"}

通过Dolphin提交(公测中)

添加数据源

在使用Dolphin前,您需要添加计算引擎数据源(Spark数据源)、Lindorm宽表数据源和源端数据源(ODPS数据源HDFS数据源)。

登录Dolphin

开通Dolphin服务登录Dolphin

创建项目

  1. 在顶部菜单栏,单击项目管理

  2. 在项目管理页面,单击创建项目,根据页面提示输入相关参数。image

  3. 单击确定,然后单击项目名称,进入项目管理页。

创建工作流并提交任务

  1. 在左侧导航栏选择工作流定义

  2. 单击创建工作流image

  3. 拖拽通用组件中的SHELL组件至右侧画布,编辑组件。

  4. 填写节点名称,并在脚本文本框中,输入提交任务的代码。image示例如下:

    #!/bin/bash
    
    
    #提交任务的LTS URL,请替换为LTS实例ID
    MASTER_URL="http://ld-bp1i13ko5e222****-master3-001.lindorm.rds.aliyuncs.com:12311/pro/proc/"
    
    # 定义提交作业的API地址
    SUBMIT_URL="${MASTER_URL}bulkload/create"
    echo "Submit URL: $SUBMIT_URL"
    
    # 定义作业提交的数据
    
    # reader config,这里以源端为Maxcompute为例
    READERCONFIG='{
    		"partition":[
    			"pt=20250212"
    		],
    		"numPartitions":20,
    		"column":[
    			"rowkey",
    			"stringcol1",
    			"longcol1",
    			"doublecol1"
    		],
    		"table":"test_table_10c"
        }'
    
    # writer config
    WRITERCONFIG='{
    		"columns":[
    			"rowkey",
    			"stringcol1",
    			"longcol1",
    			"doublecol1"
    		],
    		"namespace":"default",
    		"lindormTable":"test_table_10c_mig",
    		"compression":"zstd"
        }'
    
    # 任务启动的计算引擎Executor数量
    INSTANCE='5'
    
    # SRC为ODPS数据源的名称,请先在LTS页面中手动添加。DST请填写Lindorm宽表数据源名称。
    SRC='odps'
    DST='ld-bp1i13ko5e222****'
    
    # 计算引擎Executor和Driver规格
    DRIVERSPEC='large'
    EXECUTORSPEC='xlarge'
    
    # 源文件类型。如果是Maxcompute可以不使用这个参数
    FILETTYPE='Parquet'
    
    
    create_payload() {
        local result=""
        local first=true
    
        for arg in "$@"; do
            if [[ "$first" == true ]]; then
                result="$arg"
                first=false
            else
                result="$result&$arg"
            fi
        done
    
        echo "$result"
    }
    
    
    PAYLOAD=$(create_payload "src=$SRC" "dst=$DST" "readerConfig=$READERCONFIG" "writerConfig=$WRITERCONFIG" "instance=$INSTANCE" "driverSpec=$DRIVERSPEC" "executorSpec=$EXECUTORSPEC" "fileType=$FILETTYPE")
    
    echo "PAYLOAD: $PAYLOAD"
    
    # 提交作业
    submit_response=$(curl -d "$PAYLOAD" -H "Content-Type: application/x-www-form-urlencoded" -X POST "$SUBMIT_URL")
    echo $submit_response
    
    # 解析提交响应以获取success和message
    success=$(echo $submit_response | grep -o '"success":"[^"]*' | cut -d'"' -f4)
    message=$(echo $submit_response | grep -o '"message":"[^"]*' | cut -d'"' -f4)
    
    # 检查提交作业是否成功
    if [[ "$success" != "true" ]]; then
        echo "Job submission failed: $message"
        exit 1
    fi
    
    # 解析提交响应以获取jobID
    job_id=$message
    echo "JobId: $job_id"
    
    
    
    
    STATUS_URL="${MASTER_URL}${job_id}/info"
    echo $STATUS_URL
    
    # 检查作业状态
    while true; do
        # 获取作业状态
        status_response=$(curl --silent --request GET "$STATUS_URL")
    
        # 解析状态响应以获取state
        state=$(echo $status_response | sed 's/.*"state":"\([^"]*\)".*/\1/')
    
        # 打印当前状态
        echo "Current job state: $state"
    
        # 判断是否完成
        if [[ $state == "SUCCESS" ]]; then
            echo "Job completed successfully."
            exit 0
        elif [[ $state == "FAILED" ]]; then
            echo "Job failed."
            exit 1
        elif [[ $state == "KILLED" ]]; then
            echo "Job killed."
            exit 1
        fi
    
        # 暂停60秒后再次检查状态
        sleep 60
    done

  • 本页导读 (1)
  • 功能特性
  • 前提条件
  • 支持的数据源
  • 提交方式
  • 通过LTS操作页面提交
  • 通过API接口提交
  • 提交任务接口
  • 获取任务信息
  • 终止任务
  • 重试任务
  • 删除任务
  • 通过Dolphin提交(公测中)
  • 添加数据源
  • 登录Dolphin
  • 创建项目
  • 创建工作流并提交任务
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等