DataWorks调度Spark

DataWorks是基于MaxCompute作为计算和存储引擎的海量数据离线加工分析平台,用于工作流可视化开发和托管调度运维,支持按照时间和依赖关系的任务全面托管调度。您可以在DataWorks中,通过Shell节点或自定义节点调度和管理云原生数据仓库 AnalyticDB MySQL 版的Spark作业。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • AnalyticDB for MySQL集群与OSS存储空间位于相同地域。

  • 已创建AnalyticDB for MySQL集群的数据库账号。

  • 已开通DataWorks服务,并在AnalyticDB for MySQL同一地域创建工作空间。具体操作,请参见开通DataWorks服务创建工作空间

    • 通过自定义节点在DataWorks中调度Spark作业时,DataWorks需为企业版。

    • 通过Shell节点在DataWorks中调度Spark作业时,需购买并为工作空间绑定独享调度资源组。具体操作,请参见新增和使用独享调度资源组

准备工作

通过DataWorks调度Spark作业依赖DataXADBSparkPlugin-1.0.0.jar包,您需下载DataXADBSparkPlugin-1.0.0.jar包并将其上传至OSS

通过Shell节点调度Spark作业(推荐)

通过Shell节点调度Spark作业时,您可以选择如下两种方法:

  • 全局配置:通过运维助手全局配置clusterIdresourceGroupregionId等参数和DataXADBSparkPlugin-1.0.0.jar包所在的OSS路径,再提交Spark作业。

  • 单个Spark作业配置:提交某个Spark作业时在该业务代码中配置clusterIdresourceGroupregionId等参数和DataXADBSparkPlugin-1.0.0.jar包所在的OSS路径。

全局配置

  1. 通过运维助手安装第三方包时,需手动输入以下命令。具体操作,请参见运维助手

    rm -rf /home/admin/usertools/tools/.adb.config
    touch /home/admin/usertools/tools/.adb.config
    echo '[Credentials]' > /home/admin/usertools/tools/.adb.config
    echo 'endpoint = oss-cn-hangzhou.aliyuncs.com' >> /home/admin/usertools/tools/.adb.config
    echo 'accessKeyID = LTAI5U58FDtHnS3NzBXy****' >> /home/admin/usertools/tools/.adb.config
    echo 'accessKeySecret = 1Bvozs3DJDNSIRY57VVNPFXUt****' >> /home/admin/usertools/tools/.adb.config
    echo 'clusterId = amv-uf64134gxtjt****' >> /home/admin/usertools/tools/.adb.config
    echo 'resourceGroup = job' >> /home/admin/usertools/tools/.adb.config
    echo 'regionId = cn-hangzhou' >> /home/admin/usertools/tools/.adb.config
    
    cd /home/admin/usertools/tools/
    rm -rf DataXADBSparkPlugin-1.0.0.jar
    wget --no-check-certificate https://testBucketname.oss-cn-hangzhou.aliyuncs.com/DataWorks/DataXADBSparkPlugin-1.0.0.jar

    参数说明:

    参数

    说明

    endpoint

    OSS的Endpoint(地域节点)。

    如何查看OSS的Endpoint,请参见访问域名

    accessKeyID

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey ID和AccessKey Secret,请参见账号与权限

    accessKeySecret

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey ID和AccessKey Secret,请参见账号与权限

    clusterId

    AnalyticDB for MySQL集群ID。

    resourceGroup

    AnalyticDB for MySQL的Job型资源组名称。

    regionId

    AnalyticDB for MySQL集群的地域ID。

    wget

    下载DataXADBSparkPlugin-1.0.0.jar包。

    • 如果DataXADBSparkPlugin-1.0.0.jar包的读写权限为公共读,则使用wget --no-check-certificate https://testBucketname.oss-cn-hangzhou.aliyuncs.com/DataWorks/DataXADBSparkPlugin-1.0.0.jar命令下载。

    • 如果DataXADBSparkPlugin-1.0.0.jar包的读写权限为非公共读,则使用wget -O DataXADBSparkPlugin-1.0.0.jar "https://testBucketname.oss-cn-hangzhou.aliyuncs.com/DataWorks/DataXADBSparkPlugin-1.0.0.jar?Expires=1722****"命令下载。

    前缀为https的连接串是DataXADBSparkPlugin-1.0.0.jar包的URL,如何获取DataXADBSparkPlugin-1.0.0.jar包的URL,请参见获取单个文件的URL

  2. 创建业务流程。具体操作,请参见创建业务流程

  3. 新建节点。

    1. 右击业务流程,选择新建节点>Shell

    2. 在弹出的新建节点对话框中配置对应参数,单击确定

  4. 节点创建完成后,在右侧代码框中输入执行语句,单击image按钮执行。

    Spark SQL作业

    #!/bin/bash
    classpath="/home/admin/usertools/tools/DataXADBSparkPlugin-1.0.0.jar"
    if [ -f "$classpath" ]; then
        echo "Jar exists"
    else
        echo "Jar not exists"
        exit 1
    fi
    mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain"
    conf='{
        "AppType": "SQL",
        "SQL": "SHOW DATABASE; SHOW TABLES"
    }'
    java -cp "$classpath" "$mainclass" "$conf"

    参数说明:

    参数

    说明

    AppType

    Spark SQL作业请填写SQL。

    SQL

    业务具体的SQL语句,多个SQL语句用英文分号(;)隔开。

    Spark Jar作业

    #!/bin/bash
    classpath="/home/admin/usertools/tools/DataXADBSparkPlugin-1.0.0.jar"
    if [ -f "$classpath" ]; then
        echo "Jar exists"
    else
        echo "Jar not exists"
        exit 1
    fi
    mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain"
    conf='{
        "args": [
            "1000"
        ],
        "file": "oss://testBucketname/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }   
    }'
    java -cp "$classpath" "$mainclass" "$conf"

    参数说明:

    参数

    说明

    args

    请根据业务需求,填写使用JAR包时需要使用的参数。多个参数之间以英文逗号(,)分隔。

    file

    Spark作业依赖Jar包所在的路径。

    name

    Spark应用名称。

    className

    Java程序入口类名称。

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key: value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

  5. 运行成功后,在下方运行日志中查看Spark作业的执行结果和日志信息。

单个Spark作业配置

  1. 创建业务流程。具体操作,请参见创建业务流程

  2. 新建节点。

    1. 右击业务流程,选择新建节点>Shell

    2. 在弹出的新建节点对话框中配置对应参数,单击确定

  3. 节点创建完成后,在右侧代码框中输入执行语句,单击image按钮执行。

    Spark SQL作业

    #!/bin/bash
    /home/admin/usertools/tools/ossutil64 -e oss-cn-hangzhou-internal.aliyuncs.com -i LTAI5U58FDtHnS3NzBXy**** -k 1Bvozs3DJDNSIRY57VVNPFXUt**** cp oss://testBucketname/DataXADBSparkPlugin-1.0.0.jar DataXADBSparkPlugin-1.0.0.jar
    if [[ $? == 0 ]];then
        echo "get oss file success"
    else
        echo "get oss file failed"
        exit 1
    fi
    if [ -n "$JAVA_HOME" ]; then 
      echo "JAVA_HOME configured."
    else
      echo "JAVA_HOME not configured."
      exit 1
    fi
    current_dir=$(pwd)
    classpath="$current_dir/DataXADBSparkPlugin-1.0.0.jar"
    if [ -f "$classpath" ]; then
        echo "Jar exists"
    else
        echo "Jar not exists"
        exit 1
    fi
    mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain"
    conf='{
        "ClusterId": "amv-uf64134gxtjt****",
        "ResourceGroup": "job",
        "RegionId": "cn-hangzhou",
        "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****",
        "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****",
        "AppType": "SQL",
        "SQL": "SHOW DATABASE; SHOW TABLES"
    }'
    java -cp "$classpath" "$mainclass" "$conf"

    Spark Jar作业

    #!/bin/bash
    /home/admin/usertools/tools/ossutil64 -e oss-cn-hangzhou-internal.aliyuncs.com -i LTAI5U58FDtHnS3NzBXy**** -k 1Bvozs3DJDNSIRY57VVNPFXUt**** cp oss://testBucketname/DataXADBSparkPlugin-1.0.0.jar DataXADBSparkPlugin-1.0.0.jar
    if [[ $? == 0 ]];then
        echo "get oss file success"
    else
        echo "get oss file failed"
        exit 1
    fi
    if [ -n "$JAVA_HOME" ]; then 
      echo "JAVA_HOME configured."
    else
      echo "JAVA_HOME not configured."
      exit 1
    fi
    current_dir=$(pwd)
    classpath="$current_dir/DataXADBSparkPlugin-1.0.0.jar"
    if [ -f "$classpath" ]; then
        echo "Jar exists"
    else
        echo "Jar not exists"
        exit 1
    fi
    mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain"
    conf='{
        "ClusterId": "amv-uf64134gxtjt****",
        "ResourceGroup": "job",
        "RegionId": "cn-hangzhou",
        "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****",
        "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****",
        "args": [
            "1000"
        ],
        "file": "oss://testBucketname/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }   
    }'
    java -cp "$classpath" "$mainclass" "$conf"

    参数说明:

    参数类型

    参数

    说明

    公共参数

    ClusterId

    AnalyticDB for MySQL集群ID。

    ResourceGroup

    AnalyticDB for MySQL的Job型资源组名称。

    RegionId

    AnalyticDB for MySQL集群的地域ID。

    AccessKeyId

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey ID和AccessKey Secret,请参见账号与权限

    AccessKeySec

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey ID和AccessKey Secret,请参见账号与权限

    Spark SQL作业参数

    AppType

    Spark作业的类型。此处为SQL。

    SQL

    业务具体的SQL语句,多个SQL语句用英文分号(;)隔开。

    Spark Jar作业参数

    args

    请根据业务需求,填写使用JAR包时需要使用的参数。多个参数之间以英文逗号(,)分隔。

    file

    Spark作业依赖Jar包所在的OSS路径。

    name

    Spark应用名称。

    className

    Java程序入口类名称。

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key: value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

  4. 运行成功后,在下方运行日志中查看Spark作业的执行结果和日志信息。

通过自定义节点调度Spark作业

  1. 创建自定义节点插件。具体操作,请参见新增节点插件

    • 插件类型选择引擎型

    • 资源文件需上传准备工作中下载的DataXADBSparkPlugin-1.0.0.jar包。

    • 类名需填写为com.aliyun.adb.spark.datax.ADBSparkWrapper

    • 参数模板填写为{}即可。

  2. 创建自定义节点。具体操作,请参见新增自定义节点

    • 插件类型选择为引擎型

    • 选择插件下拉列表中选择步骤1创建的自定义插件。

  3. 创建业务流程。具体操作,请参见创建业务流程

  4. 新建节点。

    1. 右击业务流程,选择新建节点

    2. 选择步骤2创建的自定义节点。

    3. 在弹出的新建节点对话框中配置对应参数,单击确定

  5. 节点创建完成后,在右侧代码框中输入执行语句,单击image按钮执行。

    Spark SQL作业

    {
        "ClusterId": "amv-uf64134gxtjt****",
        "ResourceGroup": "job",
        "RegionId": "cn-hangzhou",
        "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****",
        "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****",
        "AppType": "SQL",
        "SQL": "SHOW DATABASE; SHOW TABLES"
    }

    Spark Jar作业

    {
        "ClusterId": "amv-uf64134gxtjt****",
        "ResourceGroup": "job",
        "RegionId": "cn-hangzhou",
        "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****",
        "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****",
        "args": [
            "1000"
        ],
        "file": "oss://testBucketname/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }   
    }

    参数说明:

    参数类型

    参数

    说明

    公共参数

    ClusterId

    AnalyticDB for MySQL集群ID。

    ResourceGroup

    AnalyticDB for MySQL的Job型资源组名称。

    RegionId

    AnalyticDB for MySQL集群的地域ID。

    AccessKeyId

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey ID和AccessKey Secret,请参见账号与权限

    AccessKeySec

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey ID和AccessKey Secret,请参见账号与权限

    Spark SQL作业参数

    AppType

    Spark作业的类型。此处为SQL。

    SQL

    业务具体的SQL语句,多个SQL语句用英文分号(;)隔开。

    Spark Jar作业参数

    args

    请根据业务需求,填写使用JAR包时需要使用的参数。多个参数之间以英文逗号(,)分隔。

    file

    Spark作业依赖Jar包所在的OSS路径。

    name

    Spark应用名称。

    className

    Java程序入口类名称。

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key: value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

  6. 运行成功后,在下方运行日志中查看Spark作业的执行结果和日志信息。