Airflow调度Spark

更新时间:

Airflow是比较流行的开源调度工具,可以实现各类工作负载的DAG编排与调度。您可以通过Spark Airflow Operator、Spark-Submit命令行工具来调度Spark任务。本文介绍如何通过Airflow调度AnalyticDB for MySQL Spark作业。

前提条件

调度Spark SQL作业

AnalyticDB for MySQL支持使用批处理交互式两种方法执行Spark SQL。选择的执行方式不同,调度的操作步骤也有所不同。详细步骤如下:

批处理

Spark Airflow Operator命令行工具

  1. 安装Airflow Spark插件。执行如下命令:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. 创建Connection,示例如下:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    参数说明:

    参数

    说明

    auth_type

    认证方式,固定填写为AK,表示使用AK认证。

    access_key_id

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey IDAccessKey Secret,请参见账号与权限

    access_key_secret

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey IDAccessKey Secret,请参见账号与权限

    region

    AnalyticDB for MySQL集群的地域ID。

  3. 创建DAG声明Spark工作流,本文的DAG声明文件为spark_dags.py

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
    ) as dag:
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_sql
    

    参数说明如下:

    DAG配置参数

    参数

    是否必填

    说明

    dag_id

    DAG的名称,您可以自定义。

    default_args

    • cluster_id:AnalyticDB for MySQL集群ID。

    • rg_name:AnalyticDB for MySQL集群Job型资源组名称。

    • region:AnalyticDB for MySQL集群的地域ID。

    更多选填参数及说明,请参见DAG参数说明

    AnalyticDBSparkSQLOperator配置参数

    参数

    是否必填

    说明

    task_id

    任务ID。

    SQL

    Spark SQL语句。

    更多选填参数及说明,请参见Airflow参数说明

  4. spark_dags.py文件存放至Airflow Configuration声明dags_folder所在的文件夹中。

  5. 执行DAG。具体操作请参见Airflow社区文档

Spark-Submit命令行工具

说明

对于AnalyticDB for MySQL特有的配置项,例如clusterId、regionId、keyIdsecretId,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。详情请参见Spark应用配置参数

  1. 安装Airflow Spark插件。执行如下命令:

    pip3 install apache-airflow-providers-apache-spark
    重要
    • 您需要使用Python3来安装Airflow Spark插件。

    • 安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要执行如下命令将pyspark卸载。

      pip3 uninstall pyspark
  2. 下载Spark-Submit命令行工具包并进行配置

  3. 配置PATH路径。执行以下命令,将Spark-Submit命令行工具的地址加入Airflow执行地址。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    在启动Airflow之前需要将Spark-Submit加入到PATH中,否则调度任务可能会找不到Spark-Submit命令。

  4. 准备DAG声明文件。本文以创建Airflow DAGdemo.py文件为例。

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  5. 将编辑完成的demo.py文件放至Airflow安装目录的dags目录下。

  6. 执行DAG。具体操作请参见Airflow社区文档

交互式

  1. 获取Spark Interactive型资源组的连接地址。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击集群管理 > 资源管理,单击资源组管理页签。

    3. 单击对应资源组操作列的详情,查看内网连接地址和公网连接地址。您可单击端口号括号内的image按钮,复制连接地址。

      以下两种情况,您需要单击公网地址后的申请网络,手动申请公网连接地址。

      • 提交Spark SQL作业的客户端工具部署在本地或外部服务器。

      • 提交Spark SQL作业的客户端工具部署在ECS上,且ECSAnalyticDB for MySQL不属于同一VPC。

  2. 安装apache-airflow-providers-apache-hiveapache-airflow-providers-common-sql依赖。

  3. 访问Airflow Web界面,在顶部导航栏单击Admin > Connections

  4. 单击image按钮,在Add Connections页面配置如下参数:

    参数

    说明

    Connection Id

    连接名称。本文示例为adb_spark_cluster

    Connection Type

    选择Hive Server 2 Thrift

    Host

    请填写步骤1中获取的连接地址。连接地址中的default需替换为实际的数据库名,并且需要删除连接地址中的resource_group=<资源组名称>后缀。

    例如:jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo

    Schema

    连接的数据库。本文示例为adb_demo

    Login

    AnalyticDB for MySQL的数据库账号及Interactive型资源组名称。格式为资源组名称/数据库账号名称

    例如:本文示例资源组名称为spark_interactive_prod,数据库账号名称为spark_user,此处填写为spark_interactive_prod/spark_user

    Password

    AnalyticDB for MySQL数据库账号的密码。

    Port

    Spark Interactive型资源组的端口号,固定为10000

    Extra

    认证方式,固定填写以下内容,表示使用用户名和密码认证。

    {
      "auth_mechanism": "CUSTOM"
    }
  5. 编写DAG文件。

    from airflow import DAG
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from datetime import datetime
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2025, 2, 10),
        'retries': 1,
    }
    
    dag = DAG(
        'adb_spark_sql_test',
        default_args=default_args,
        schedule_interval='@daily',
    )
    
    
    jdbc_query = SQLExecuteQueryOperator(
        task_id='execute_spark_sql_query', 
        conn_id='adb_spark_cluster',  
        sql='show databases',  
        dag=dag
    )
    
    jdbc_query

    参数说明:

    参数

    是否必填

    说明

    task_id

    任务ID。您可以自定义。

    conn_id

    连接名称。此处填写步骤4创建的Connection ID。

    sql

    Spark SQL语句。

    更多选填参数及说明,请参见Airflow参数说明

  6. Airflow Web界面,单击对应DAG右侧image按钮。

调度Spark Jar作业

Spark Airflow Operator命令行工具

  1. 安装Airflow Spark插件。执行如下命令:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. 创建Connection,示例如下:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    参数说明:

    参数

    说明

    auth_type

    认证方式,固定填写为AK,表示使用AK认证。

    access_key_id

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey IDAccessKey Secret,请参见账号与权限

    access_key_secret

    阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey IDAccessKey Secret,请参见账号与权限

    region

    AnalyticDB for MySQL集群的地域ID。

  3. 创建DAG声明Spark工作流,本文的DAG声明文件为 spark_dags.py

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher in order to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    DAG配置参数

    参数

    是否必填

    说明

    dag_id

    DAG的名称,您可以自定义。

    default_args

    • cluster_id:AnalyticDB for MySQL集群ID。

    • rg_name:AnalyticDB for MySQL集群Job型资源组名称。

    • region:AnalyticDB for MySQL集群的地域ID。

    更多选填参数及说明,请参见DAG参数说明

    AnalyticDBSparkBatchOperator配置参数

    参数

    是否必填

    说明

    task_id

    任务ID。

    file

    Spark应用主文件的存储路径,文件路径需为绝对路径。主文件是入口类所在的JAR包或者Python的入口执行文件。

    重要

    Spark应用主文件目前只支持存储在OSS中。

    OSS BucketAnalyticDB for MySQL集群需要在同一地域。

    class_name

    条件必填

    • JavaScala程序入口类名称,必填参数。

    • Python不需要指定入口类,非必填参数。

    更多选填参数及说明,请参见AnalyticDBSparkBatchOperator参数说明

  4. spark_dags.py文件存放至Airflow Configuration声明dags_folder所在的文件夹中。

  5. 执行DAG。具体操作请参见Airflow社区文档

Spark-Submit命令行工具

说明

对于AnalyticDB for MySQL特有的配置项,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。详情请参见Spark应用配置参数

  1. 安装Airflow Spark插件。执行如下命令:

    pip3 install apache-airflow-providers-apache-spark
    重要
    • 您需要使用Python3来安装Airflow Spark插件。

    • 安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要执行如下命令将pyspark卸载。

      pip3 uninstall pyspark
  2. 下载Spark-Submit命令行工具包并进行配置

  3. 配置PATH路径。执行以下命令,将Spark-Submit命令行工具的地址加入Airflow执行地址。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    在启动Airflow之前需要将Spark-Submit加入到PATH中,否则调度任务可能会找不到Spark-Submit命令。

  4. 准备DAG声明文件。本文以创建Airflow DAGdemo.py文件为例。

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        start_date=datetime(2021, 1, 1),
        schedule=None,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher in order to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    参数说明:

    DAG配置参数

    参数

    是否必填

    说明

    dag_id

    DAG的名称,您可以自定义。

    default_args

    • cluster_id:AnalyticDB for MySQL集群ID。

    • rg_name:AnalyticDB for MySQL集群Job型资源组名称。

    • region:AnalyticDB for MySQL集群的地域ID。

    更多选填参数及说明,请参见DAG参数说明

    AnalyticDBSparkBatchOperator配置参数

    参数

    是否必填

    说明

    task_id

    任务ID。

    file

    Spark应用主文件的存储路径,文件路径需为绝对路径。主文件是入口类所在的JAR包或者Python的入口执行文件。

    重要

    Spark应用主文件目前只支持存储在OSS中。

    OSS BucketAnalyticDB for MySQL集群需要在同一地域。

    class_name

    条件必填

    • JavaScala程序入口类名称,必填参数。

    • Python不需要指定入口类,非必填参数。

    更多选填参数及说明,请参见AnalyticDBSparkBatchOperator参数说明

  5. 将编辑完成的demo.py文件放至Airflow安装目录的dags目录下。

  6. 执行DAG。具体操作请参见Airflow社区文档