本文介绍如何通过Dolphin(公测中)提交数据导入任务。
添加数据源
在使用Dolphin前,您需要添加计算引擎数据源(Spark数据源)、Lindorm宽表数据源和源端数据源(ODPS数据源或HDFS数据源)。
登录Dolphin
创建项目
在顶部菜单栏,单击项目管理。
在项目管理页面,单击创建项目,根据页面提示输入相关参数。

单击确定,然后单击项目名称,进入项目管理页。
创建工作流并提交任务
在左侧导航栏选择工作流定义。
单击创建工作流。

拖拽通用组件中的SHELL组件至右侧画布,编辑组件。
填写节点名称,并在脚本文本框中,输入提交任务的代码。
示例如下:#!/bin/bash #提交任务的LTS URL,请替换为LTS实例ID MASTER_URL="http://ld-bp1i13ko5e222****-master3-001.lindorm.rds.aliyuncs.com:12311/pro/proc/" # 定义提交作业的API地址 SUBMIT_URL="${MASTER_URL}bulkload/create" echo "Submit URL: $SUBMIT_URL" # 定义作业提交的数据 # reader config,这里以源端为Maxcompute为例 READERCONFIG='{ "partition":[ "pt=20250212" ], "numPartitions":20, "column":[ "rowkey", "stringcol1", "longcol1", "doublecol1" ], "table":"test_table_10c" }' # writer config WRITERCONFIG='{ "columns":[ "rowkey", "stringcol1", "longcol1", "doublecol1" ], "namespace":"default", "lindormTable":"test_table_10c_mig", "compression":"zstd" }' # 任务启动的计算引擎Executor数量 INSTANCE='5' # SRC为ODPS数据源的名称,请先在LTS页面中手动添加。DST请填写Lindorm宽表数据源名称。 SRC='odps' DST='ld-bp1i13ko5e222****' # 计算引擎Executor和Driver规格 DRIVERSPEC='large' EXECUTORSPEC='xlarge' # 源文件类型。如果是Maxcompute可以不使用这个参数 FILETTYPE='Parquet' create_payload() { local result="" local first=true for arg in "$@"; do if [[ "$first" == true ]]; then result="$arg" first=false else result="$result&$arg" fi done echo "$result" } PAYLOAD=$(create_payload "src=$SRC" "dst=$DST" "readerConfig=$READERCONFIG" "writerConfig=$WRITERCONFIG" "instance=$INSTANCE" "driverSpec=$DRIVERSPEC" "executorSpec=$EXECUTORSPEC" "fileType=$FILETTYPE") echo "PAYLOAD: $PAYLOAD" # 提交作业 submit_response=$(curl -d "$PAYLOAD" -H "Content-Type: application/x-www-form-urlencoded" -X POST "$SUBMIT_URL") echo $submit_response # 解析提交响应以获取success和message success=$(echo $submit_response | grep -o '"success":"[^"]*' | cut -d'"' -f4) message=$(echo $submit_response | grep -o '"message":"[^"]*' | cut -d'"' -f4) # 检查提交作业是否成功 if [[ "$success" != "true" ]]; then echo "Job submission failed: $message" exit 1 fi # 解析提交响应以获取jobID job_id=$message echo "JobId: $job_id" STATUS_URL="${MASTER_URL}${job_id}/info" echo $STATUS_URL # 检查作业状态 while true; do # 获取作业状态 status_response=$(curl --silent --request GET "$STATUS_URL") # 解析状态响应以获取state state=$(echo $status_response | sed 's/.*"state":"\([^"]*\)".*/\1/') # 打印当前状态 echo "Current job state: $state" # 判断是否完成 if [[ $state == "SUCCESS" ]]; then echo "Job completed successfully." exit 0 elif [[ $state == "FAILED" ]]; then echo "Job failed." exit 1 elif [[ $state == "KILLED" ]]; then echo "Job killed." exit 1 fi # 暂停60秒后再次检查状态 sleep 60 done
该文章对您有帮助吗?