DataWorks调度Spark

更新时间:

DataWorks提供全链路大数据开发治理能力,支持包括AnalyticDB在内的多种计算引擎。DataWorks数据开发(DataStudio)模块支持工作流可视化开发和托管调度运维,能够按照时间和依赖关系轻松实现任务的全面托管调度。您可以在DataWorks中通过ADB Spark SQLADB Spark节点开发和调度Spark作业(Spark SQL作业和Spark应用作业)。

前提条件

  • AnalyticDB for MySQL集群需满足以下条件:

  • DataWorks需满足以下条件:

    • AnalyticDB for MySQL集群与DataWorks位于相同地域。

    • 创建DataWorks工作空间,且已打开参加数据开发(DataStudio)(新版)公测的开关。

      说明

      您可在新建DataWorks工作空间时打开参加数据开发(DataStudio)(新版)公测的开关,或提交工单联系技术支持为已有工作空间打开参加数据开发(DataStudio)(新版)公测的开关。

    • 创建并绑定资源组

      说明

      创建资源组时,资源组所属VPCAnalyticDB for MySQL集群的VPC保持一致。

    • 已将DataWorks工作空间绑定的资源组的交换机IPv4网段添加到AnalyticDB for MySQL集群白名单中,详情请参见设置白名单

    • 已为DataWorks工作空间创建了AnalyticDB for Spark计算资源。详情请参见绑定AnalyticDB for Spark计算资源

DataWorks调度Spark SQL作业

AnalyticDB for MySQL具有外表作业开发和内表作业开发的能力,本文以外表作业开发场景为例,介绍通过DataWorks开发和调度Spark SQL作业的步骤。

步骤一:创建ADB Spark SQL节点

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 单击项目目录右侧的image,选择新建节点... > AnalyticDB > ADB Spark SQL

  3. 在弹出的节点对话框中,输入节点名称,按Enter键确认创建。

步骤二:开发ADB Spark SQL节点

  1. 以在ADB Spark SQL节点中创建外部数据库为例。如您需要创建内表请参考 Spark SQL创建内表

    CREATE DATABASE IF NOT EXISTS `adb_spark_db` LOCATION 'oss://testBuckename/db_dome';
  2. ADB Spark SQL节点中创建外表adb_spark_db.tb_order

    CREATE TABLE IF NOT EXISTS adb_spark_db.tb_order(id int, name string, age int) 
    USING parquet 
    LOCATION 'oss://testBuckename/db_dome/tb1' 
    TBLPROPERTIES ('parquet.compress'='SNAPPY');
  3. 查询数据。

    外表创建成功后,您可以在AnalyticDB for MySQL中通过SELECT语句查询Parquet数据。

    SELECT * FROM adb_spark_db.tb_order limit 100;
  4. ADB Spark SQL节点中创建Delta Lakeadb_spark_db.raw_order

    CREATE TABLE IF NOT EXISTS adb_spark_db.raw_order(id int, name string, age int) 
    USING delta;
  5. adb_spark_db.tb_order数据导入至adb_spark_db.raw_order

    INSERT INTO adb_spark_db.raw_order SELECT * FROM adb_spark_db.tb_order;
  6. AnalyticDB for MySQL中创建数据库。如果有已创建的数据库,可以忽略本步骤。示例如下:

    CREATE DATABASE adb_demo; 
  7. AnalyticDB for MySQL中创建内表用于存储从Delta Lake表中导入的数据。示例如下:

    CREATE TABLE adb_demo.order_xuanwu_format (
        `id` int, 
        `name` string, 
        `age` int) 
    using adb 
        TBLPROPERTIES (
        'distributeType'='HASH',
        'distributeColumns' = 'id',
        'storagePolicy' = 'hot'
    );
  8. Delta Lakeadb_spark_db.raw_order的数据导入order_xuanwu_format

    INSERT OVERWRITE adb_demo.order_xuanwu_format SELECT * FROM adb_spark_db.adb_spark_db.raw_order;

步骤三:配置并执行ADB Spark SQL节点

  1. 在页面右侧单击调试配置,配置ADB Spark SQL节点运行参数。

    参数类型

    参数名称

    描述

    计算资源

    计算资源

    选择您所绑定的AnalyticDB for Spark计算资源。

    ADB计算资源组

    选择您在AnalyticDB for MySQL集群中创建的Spark引擎Interactive型资源组。

    DataWorks配置

    资源组

    选择您绑定AnalyticDB for Spark计算资源时已通过测试连通性的DataWorks资源组。

    计算CU

    当前节点使用默认CU值,无需修改CU。

    脚本参数

    参数名

    您在ADB Spark SQL节点中配置的参数名称。例如,您可以在脚本中配置参数$[yyyymmdd],实现对每日新增数据的批量同步处理。支持配置的参数及其格式,请参见配置调度参数

    说明

    系统会自动识别节点中配置的参数名称。

    参数值

    配置参数值,任务运行时会将它动态替换为真实的取值。

  2. (可选)如需定期执行节点任务,请在节点右侧调度配置调度策略中配置计算资源ADB计算资源组调度资源组信息,并在调度参数中配置参数信息。

  3. 完成调试配置后,单击image保存已配置好的SQL节点,然后单击image测试运行SQL脚本,查看SQL脚本是否符合预期。

  4. 完成调度配置后,即可对已完成的数据库节点提交发布至生产环境。

  5. 发布完成的任务,将按照您配置的参数周期性运行,可在运维中心 > 任务运维 > 周期任务运维 > 周期任务中查看并管理已发布的周期任务,详情请参见:运维中心入门

DataWorks调度Spark JAR作业

步骤一:创建ADB Spark节点

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 单击项目目录右侧的image,选择新建节点... > AnalyticDB > ADB Spark

  3. 在弹出的节点对话框中,输入节点名称,按Enter键确认创建。

步骤二:开发ADB Spark节点

ADB Spark节点支持Java/Scala、Python语言开发。

Java/Scala语言开发说明

  1. 准备示例Jar包。

    您可直接下载该示例Jarspark-examples_2.12-3.2.0.jar,用于后续开发、调度ADB Spark节点。

  2. 将示例代码spark-examples_2.12-3.2.0.jar上传到与AnalyticDB for MySQL处于同一地域的OSS Bucket。具体操作,请参见控制台上传文件

  3. 配置ADB Spark节点。

    语言类型

    参数名称

    参数描述

    Java/Scala

    Jar资源

    Jar包在OSS上的存储路径。示例值如:oss://testBucketname/db_dome/spark-examples_2.12-3.2.0.jar

    Main Class

    您所需执行的主类名称,例如上述代码中的主类名称com.work.SparkWork

    参数

    填写您所需传入代码的参数信息。

    配置项

    配置Spark程序运行参数,详情请参见Spark应用配置参数说明

    示例如下:

    spark.driver.resourceSpec: medium

Python语言开发说明

  1. 准备测试数据。

    新建一个需要通过Spark读取的TXT文件data.txt,在文件中添加以下内容。

    Hello,Dataworks
    Hello,OSS
  2. 编写示例代码。

    您需新建一个spark_oss.py文件,在spark_oss.py文件中添加以下内容。

    import sys
    
    from pyspark.sql import SparkSession
    
    # 初始Spark
    spark = SparkSession.builder.appName('OSS Example').getOrCreate()
    # 读取指定的文件,文件路径由args传入的参数值来指定
    textFile = spark.sparkContext.textFile(sys.argv[1])
    # 计算文件行数并打印
    print("File total lines: " + str(textFile.count()))
    # 打印文件的第一行内容
    print("First line is: " + textFile.first())
    
  3. 上传测试数据data.txt和示例代码spark_oss.py到与AnalyticDB for MySQL处于同一地域的OSS Bucket。具体操作,请参见控制台上传文件

  4. 配置ADB Spark节点内容。

    语言类型

    参数名称

    参数描述

    Python

    主程序包

    本例步骤3spark_oss.pyOSS路径,示例值如oss://testBucketname/db_dome/spark_oss.py

    参数

    本例步骤3data.txtOSS路径,示例值如oss://testBucketname/db_dome/data.txt

    配置项

    配置Spark程序运行参数,详情请参见Spark应用配置参数说明

    示例如下:

    spark.driver.resourceSpec: medium

步骤三:配置并执行ADB Spark节点

  1. 在页面右侧单击调试配置,配置ADB Spark SQL节点运行参数。

    参数类型

    参数名称

    描述

    计算资源

    计算资源

    选择您所绑定的AnalyticDB for Spark计算资源。

    ADB计算资源组

    选择您在AnalyticDB for MySQL集群中新建的Job型资源组。

    DataWorks配置

    资源组

    选择您绑定AnalyticDB for Spark计算资源时已通过测试连通性的DataWorks资源组。

    计算CU

    当前节点使用默认CU值,无需修改CU。

    脚本参数

    参数名

    您在ADB Spark JAR作业中配置的参数名称。

    说明

    系统会自动识别节点中配置的参数名称。

    参数值

    配置参数值,任务运行时会将它动态替换为真实的取值。

  2. (可选)如需定期执行节点任务,请在节点右侧调度配置调度策略中配置计算资源ADB计算资源组调度资源组信息,并在调度参数中配置参数信息。

  3. 完成调试配置后,单击image保存已配置好的SQL节点,然后单击image测试运行SQL脚本,查看SQL脚本是否符合预期。

  4. 完成调度配置后,即可对已完成的数据库节点提交发布至生产环境。

  5. 发布完成的任务,将按照您配置的参数周期性运行,可在运维中心 > 任务运维 > 周期任务运维 > 周期任务中查看并管理已发布的周期任务,详情请参见运维中心入门