通过Dolphin管理作业

Dolphin是可视化DAG工作流任务调度系统。本文介绍如何通过Dolphin调度Lindorm计算引擎的批作业和SQL作业。

前提条件

已开通Lindorm计算引擎。如何开通,请参见开通与变配

使用说明

Dolphin提供的调度能力和使用方法兼容开源,详细信息请参考开源文档:Dolphin开源文档

步骤一:开通Dolphin服务

  1. 登录Lindorm管理控制台

  2. 在页面左上角,选择实例所属的地域。

  3. 实例列表页,单击目标实例ID或者目标实例所在行操作列的管理

  4. 在左侧导航栏,单击数据库连接,然后单击计算引擎页签。

  5. 单击开通Dolphin地址

    说明

    如需使用公网,可单击界面右上角的开通公网地址进行开通。

步骤二:登录Dolphin

  1. 计算引擎页签,获取Dolphin地址,并单击该地址打开Dolphin服务页面。

  2. 使用Lindorm宽表引擎账号和密码登录Dolphin服务,获取方法请参见用户管理

步骤三:创建数据源

  1. 在Dolphin控制台,选择源中心页签,单击创建源

    image

  2. 选择源类型KYUUBI,并配置以下参数:

    参数

    说明

    数据源

    选择KYUUBI

    源名称

    自定义数据源名称。本文以Lindorm-test为例。

    IP主机名

    JDBC连接地址。形如:ld-bp171py46qn73****-proxy-ldps.lindorm.aliyuncs.com,获取方式请参考JDBC连接地址

    端口

    填写10009。

    用户名

    宽表引擎的默认用户名或其他新建的用户名。获取方式请参考访问实例

    密码

    宽表引擎的默认密码或其他新建的密码。获取方式参考访问实例

    数据库名

    填写default或其他已存在且有访问权限的数据库。

    数据仓库

    填写default。

    JDBC连接参数

    JDBC连接参数。形如:{"token":"4175f22f-****-416a-943e-57b998da10e1","kyuubi.engine.share.level":"CONNECTION"},token获取方式参考JDBC连接地址

    每个CONNECTION对应独立的计算资源。

  3. 单击测试连接,验证连通性。

步骤四:创建工作流

  1. 在Dolphin控制台,选择项目管理页签,单击创建项目,配置项目名称

  2. 单击新建的项目名称,进入项目管理界面。

  3. 在左侧导航栏,单击工作流定义,然后单击创建工作流

  4. 拖拽左侧通用组件(SHELL/SQL)到右侧画布,对目标组件进行编辑。

    image

  5. 创建节点。

    • 创建SQL节点。

      1. 输入节点名称,以sql-test为例,数据源类型选择KYUUBI数据源实例选择已创建的数据源Lindorm-test,输入SQL语句示例如下:

        SQL语句示例如下:

        SELECT webUI();
      2. 添加前置SQL语句,并使用配置任务参数进行设置。

        重要

        前置SQL语句后不加分号。

        image

      3. 单击确认,保存并退出。

    • 创建Shell节点。

      1. 输入节点名称,以shell-test为例,并提交批作业脚本,参数说明请参见创建作业参数说明。脚本示例如下:

        curl --location --request POST http://${endpoint}:10099/api/v1/lindorm/jobs/${token} --header "Content-Type:application/json" --data '{
        "owner":"root",
        "name":"test",
        "mainResourceKind":"jar",
        "mainClass":"com.aliyun.lindorm.WordCount",
        "mainResource":"oss://java_job/lindorm-spark-examples-1.0-SNAPSHOT.jar",
        "mainArgs":[],
        "username":"root",
        "password":"test"
        "conf":{
                "spark.dynamicAllocation.enabled":"false"}
        }'
      2. 单击确认,保存并退出。

  6. 单击界面右上角的保存,输入工作流名称执行策略选择串行等待,并保存工作流。

    image

  7. 在目标工作流操作栏图标,单击image图标,上线工作流。然后单击image图标,运行工作流。

    image

  8. 在左侧导航栏,单击任务实例,查看当前任务。

    image

  9. 在左侧导航栏,单击工作流定义,在目标工作流操作栏图标,单击定时图标,可设置定时调度工作流。

    image

    说明

    建议设置定时调度间隔为30分钟,且每次调度的节点数不超过10个。

步骤五:作业运维

查看SQL作业详情

  1. 在左侧导航栏,单击任务实例,单击右侧任务栏image图标,进入日志界面。

  2. 在日志中搜索SQL Engine UI,获取WebUI的链接。

  3. 通过获取的链接,访问WebUI界面,查看作业详情。

查看Shell作业详情

  1. 在左侧导航栏,单击任务实例,单击右侧任务栏image图标,进入日志界面。

  2. 在日志中搜索JobId,获取Shell作业的ID。

  3. 登录Lindorm管理控制台

  4. 在页面左上角,选择实例所属的地域。

  5. 实例列表页,单击目标实例ID或者目标实例所在行操作列的管理

  6. 在左侧导航栏,单击计算引擎

  7. 作业列表JobId中输入已获取的Shell作业ID,查找目标作业。

  8. 在目标作业搜索结果中获取WebUI地址

  9. 通过获取的链接,访问WebUI界面,查看作业详情。

最佳实践

在工作流的Shell节点中同步调度JAR作业。参数说明请参见通过REST API管理作业

#!/bin/bash

# 定义提交作业的 API 地址
SUBMIT_URL="http://${JDBC_URL}:10099/api/v1/lindorm/jobs/${token}"

# 定义作业提交的数据
PAYLOAD='{
    "owner":"root",
    "name":"test-shell",
    "username":"root",
    "password":"test",
    "mainResourceKind":"sql",
    "mainResource":"select webUI()",
    "mainArgs":[],
    "conf":{
        "spark.kubernetes.namespace":"default"
    }
}'

# 提交作业
submit_response=$(curl --silent --location --request POST "$SUBMIT_URL" --header "Content-Type:application/json" --data "$PAYLOAD")

# 解析提交响应以获取 jobId
job_id=$(echo $submit_response | grep -o '"jobId":"[^"]*' | cut -d'"' -f4)
echo $job_id

# 检查提交作业是否成功
if [[ -z "$job_id" ]]; then
    echo "Failed to submit the job."
    exit 1
fi

STATUS_URL="$SUBMIT_URL/$job_id"
echo $STATUS_URL

# 检查作业状态
while true; do
    # 获取作业状态
    status_response=$(curl --silent --request GET "$STATUS_URL")

    # 解析状态响应以获取 state
    state=$(echo $status_response | grep -o '"state":"[^"]*' | cut -d'"' -f4)

    # 打印当前状态
    echo "Current job state: $state"

    # 判断是否完成
    if [[ "$state" == "success" ]]; then
        echo "Job completed successfully."
        exit 0
    elif [[ "$state" == "failed" ]]; then
        echo "Job failed."
        exit 1
    fi

    # 暂停 60 秒后再次检查状态
    sleep 60
done