Airflow调度Spark

更新时间:

Airflow是比较流行的开源调度工具,可以实现各类工作负载的DAG编排与调度。您可以通过AnalyticDB for MySQL Spark Airflow Operator、Spark-Submit命令行工具来实现Airflow调度Spark任务。本文介绍如何通过Airflow调度AnalyticDB for MySQL Spark作业。

注意事项

  • AnalyticDB for MySQL Spark支持的配置参数,请参见Spark应用配置参数说明

  • 如果您使用的是Apache Livy的调度方式,AnalyticDB for MySQL Spark Livy Proxy相关工具会在近期发布,可与维护团队联系申请邀测使用。

Spark Airflow Operator命令行工具

准备工作

  1. 安装Airflow服务并启动。具体操作,请参见Airflow社区文档

  2. 安装Airflow Spark插件。执行如下命令:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl

操作步骤

  1. 准备Connection,示例如下。具体操作,请参见创建Connection

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }
  2. 创建DAG声明Spark工作流,本文的DAG声明文件为 spark_dags.py

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        start_date=datetime(2021, 1, 1),
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
    
        spark_batch = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="oss://<bucket_name>/tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi"
        )
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_batch >> spark_sql
    

    参数说明如下。

    AnalyticDBSparkBatchOperator支持配置的参数。

    参数

    是否必填

    说明

    file

    Spark应用主文件的存储路径,文件路径需为绝对路径。主文件是入口类所在的JAR包或者Python的入口执行文件。

    重要

    Spark应用主文件目前只支持存储在OSS中。

    OSS Bucket与AnalyticDB for MySQL集群需要在同一地域。

    class_name

    • Java或Scala程序入口类名称,必填参数。

    • Python不需要指定入口类,非必填参数。

    args

    Spark应用参数。

    conf

    与开源Spark中的配置项基本一致,参数格式为key: value形式。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

    jars

    Spark应用依赖的JAR包。需填写JAR包文件的绝对路径。JAR包在运行时会被加入到Driver和Executor JVM的ClassPath里面。

    重要

    Spark应用所依赖的所有JAR包必须存储在OSS中。

    OSS Bucket与AnalyticDB for MySQL集群需要在同一地域。

    py_files

    PySpark依赖的Python文件,后缀可以是ZIP、PY和EGG。如果依赖多个Python文件,建议使用ZIP或者EGG压缩包。您可以在Python代码中以module方式引用Python文件。

    重要

    Spark应用所依赖的所有Python文件须存储在OSS中。

    files

    Spark应用依赖的文件资源,文件会被下载到Driver和Executor进程的当前执行目录下。

    支持配置文件别名,例如oss://<testBucketName>/test/test1.txt#test1,test1为文件别名,您可以使用./test1或者./test1.txt访问文件。

    说明

    files中包含名为log4j.properties的文件时,Spark会使用该log4j.properties文件作为日志配置。

    Spark应用所依赖的所有文件须存储在OSS中。

    driver_resource_spec

    Spark driver的资源规格。默认值为medium。

    不同型号的取值对应不同的规格,详情请参见Spark资源规格列表的型号列。

    说明

    spark.driver.resourceSpecspark.executor.resourceSpec参数取值相同。

    仅提交Spark离线应用时,可使用开源Spark参数,且取值需为Spark资源规格列表中的核数和内存。

    executor_resource_spec

    Spark executor的资源规格。默认值为medium。

    不同型号的取值对应不同的规格,详情请参见Spark资源规格列表的型号列。

    num_executors

    Spark Executor个数。默认值为3。

    archives

    Spark应用依赖的压缩包资源,目前支持.TAR.GZ后缀。压缩包会被解压到当前Spark进程的当前目录下。

    支持配置文件别名,例如oss://testBucketName/test/test1.tar.gz#test1,test1为文件别名。假设test2.txt是test1.tar.gz压缩包中的文件,您可以使用./test1/test2.txt或者./test1.tar.gz/test2.txt访问解压后的文件。

    说明

    Spark应用所依赖的所有压缩包须存储在OSS中。压缩包解压缩失败,任务会失败。

    name

    Spark应用名称。

    cluster_id

    AnalyticDB for MySQL企业版、基础版及湖仓版集群ID。

    rg_name

    AnalyticDB for MySQL企业版、基础版及湖仓版集群的Job型资源组名称。

    adb_spark_conn_id

    AnalyticDB for MySQL Spark Airflow Connection ID。默认值为adb_spark_default

    region

    AnalyticDB for MySQL企业版、基础版及湖仓版集群所属地域ID。

    polling_interval

    扫描Spark应用状态周期。

    AnalyticDBSparkSQLOperator支持配置的参数。

    参数

    是否必填

    说明

    SQL

    Spark SQL语句。

    conf

    与开源Spark中的配置项基本一致,参数格式为key: value形式。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

    driver_resource_spec

    Spark driver的资源规格。默认值为medium。

    不同型号的取值对应不同的规格,详情请参见Spark资源规格列表的型号列。

    说明

    spark.driver.resourceSpecspark.executor.resourceSpec参数取值相同。

    仅提交Spark离线应用时,可使用开源Spark参数,且取值需为Spark资源规格列表中的核数和内存。

    executor_resource_spec

    Spark executor的资源规格。默认值为medium。

    不同型号的取值对应不同的规格,详情请参见Spark资源规格列表的型号列。

    num_executors

    Spark Executor个数。默认值为3。

    name

    Spark应用名称。

    cluster_id

    AnalyticDB for MySQL企业版、基础版及湖仓版集群ID。

    rg_name

    AnalyticDB for MySQL企业版、基础版及湖仓版集群的Job型资源组名称。

    adb_spark_conn_id

    AnalyticDB for MySQL Spark Airflow Connection ID。默认值为adb_spark_default

    region

    AnalyticDB for MySQL企业版、基础版及湖仓版集群所属地域ID。

    polling_interval

    扫描Spark应用状态周期。

  3. spark_dags.py文件存放至Airflow Configuration声明dags_folder所在的文件夹中。

  4. 执行DAG。具体操作请参见Airflow社区文档

Spark-Submit命令行工具

说明

对于AnalyticDB for MySQL特有的配置项,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。详情请参见Spark应用配置参数

准备工作

准备工作一:安装Airflow服务

  1. 安装Airflow服务并启动。具体操作请参见Airflow社区文档

  2. 安装Airflow Spark插件。执行如下命令:

    pip3 install apache-airflow-providers-apache-spark
    重要
    • 您需要使用Python3来安装Airflow Spark插件。

    • 安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要执行如下命令将pyspark卸载。

      pip3 uninstall pyspark

准备工作二:下载并配置Spark-Submit命令行工具

  1. 下载Spark-Submit命令行工具包并进行配置。具体操作请参见通过Spark-Submit命令行工具开发Spark应用

  2. 配置PATH路径。执行以下命令,将Spark-Submit命令行工具的地址加入Airflow执行地址。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    在启动Airflow之前需要将Spark-Submit加入到PATH中,否则调度任务可能会找不到Spark-Submit命令。

操作步骤

  1. 准备DAG声明文件。本文以创建Airflow DAG的demo.py文件为例。

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  2. 将编辑完成的demo.py文件放至Airflow安装目录的dags目录下。

  3. 执行DAG。具体操作请参见Airflow社区文档