通过Dolphin(公测中)提交数据导入任务

本文介绍如何通过Dolphin(公测中)提交数据导入任务。

添加数据源

在使用Dolphin前,您需要添加计算引擎数据源(Spark数据源)、Lindorm宽表数据源和源端数据源(ODPS数据源HDFS数据源)。

登录Dolphin

开通Dolphin服务登录Dolphin

创建项目

  1. 在顶部菜单栏,单击项目管理

  2. 在项目管理页面,单击创建项目,根据页面提示输入相关参数。image

  3. 单击确定,然后单击项目名称,进入项目管理页。

创建工作流并提交任务

  1. 在左侧导航栏选择工作流定义

  2. 单击创建工作流image

  3. 拖拽通用组件中的SHELL组件至右侧画布,编辑组件。

  4. 填写节点名称,并在脚本文本框中,输入提交任务的代码。image示例如下:

    #!/bin/bash
    
    
    #提交任务的LTS URL,请替换为LTS实例ID
    MASTER_URL="http://ld-bp1i13ko5e222****-master3-001.lindorm.rds.aliyuncs.com:12311/pro/proc/"
    
    # 定义提交作业的API地址
    SUBMIT_URL="${MASTER_URL}bulkload/create"
    echo "Submit URL: $SUBMIT_URL"
    
    # 定义作业提交的数据
    
    # reader config,这里以源端为Maxcompute为例
    READERCONFIG='{
    		"partition":[
    			"pt=20250212"
    		],
    		"numPartitions":20,
    		"column":[
    			"rowkey",
    			"stringcol1",
    			"longcol1",
    			"doublecol1"
    		],
    		"table":"test_table_10c"
        }'
    
    # writer config
    WRITERCONFIG='{
    		"columns":[
    			"rowkey",
    			"stringcol1",
    			"longcol1",
    			"doublecol1"
    		],
    		"namespace":"default",
    		"lindormTable":"test_table_10c_mig",
    		"compression":"zstd"
        }'
    
    # 任务启动的计算引擎Executor数量
    INSTANCE='5'
    
    # SRCODPS数据源的名称,请先在LTS页面中手动添加。DST请填写Lindorm宽表数据源名称。
    SRC='odps'
    DST='ld-bp1i13ko5e222****'
    
    # 计算引擎ExecutorDriver规格
    DRIVERSPEC='large'
    EXECUTORSPEC='xlarge'
    
    # 源文件类型。如果是Maxcompute可以不使用这个参数
    FILETTYPE='Parquet'
    
    
    create_payload() {
        local result=""
        local first=true
    
        for arg in "$@"; do
            if [[ "$first" == true ]]; then
                result="$arg"
                first=false
            else
                result="$result&$arg"
            fi
        done
    
        echo "$result"
    }
    
    
    PAYLOAD=$(create_payload "src=$SRC" "dst=$DST" "readerConfig=$READERCONFIG" "writerConfig=$WRITERCONFIG" "instance=$INSTANCE" "driverSpec=$DRIVERSPEC" "executorSpec=$EXECUTORSPEC" "fileType=$FILETTYPE")
    
    echo "PAYLOAD: $PAYLOAD"
    
    # 提交作业
    submit_response=$(curl -d "$PAYLOAD" -H "Content-Type: application/x-www-form-urlencoded" -X POST "$SUBMIT_URL")
    echo $submit_response
    
    # 解析提交响应以获取successmessage
    success=$(echo $submit_response | grep -o '"success":"[^"]*' | cut -d'"' -f4)
    message=$(echo $submit_response | grep -o '"message":"[^"]*' | cut -d'"' -f4)
    
    # 检查提交作业是否成功
    if [[ "$success" != "true" ]]; then
        echo "Job submission failed: $message"
        exit 1
    fi
    
    # 解析提交响应以获取jobID
    job_id=$message
    echo "JobId: $job_id"
    
    
    
    
    STATUS_URL="${MASTER_URL}${job_id}/info"
    echo $STATUS_URL
    
    # 检查作业状态
    while true; do
        # 获取作业状态
        status_response=$(curl --silent --request GET "$STATUS_URL")
    
        # 解析状态响应以获取state
        state=$(echo $status_response | sed 's/.*"state":"\([^"]*\)".*/\1/')
    
        # 打印当前状态
        echo "Current job state: $state"
    
        # 判断是否完成
        if [[ $state == "SUCCESS" ]]; then
            echo "Job completed successfully."
            exit 0
        elif [[ $state == "FAILED" ]]; then
            echo "Job failed."
            exit 1
        elif [[ $state == "KILLED" ]]; then
            echo "Job killed."
            exit 1
        fi
    
        # 暂停60秒后再次检查状态
        sleep 60
    done