Airflow调度Spark
Airflow是比较流行的开源调度工具,可以实现各类工作负载的DAG编排与调度。您可以通过Spark Airflow Operator、Spark-Submit命令行工具来调度Spark任务。本文介绍如何通过Airflow调度AnalyticDB for MySQL Spark作业。
前提条件
- AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。 
- AnalyticDB for MySQL集群中已创建Job型资源组或Spark引擎的Interactive型资源组。 
- 已安装Python环境,且Python版本为3.7及以上版本。 
- 已安装Airflow,且Airflow的版本为2.9.0及以上版本。 
- 已将运行Airflow的服务器IP地址添加至AnalyticDB for MySQL集群的白名单中。 
调度Spark SQL作业
AnalyticDB for MySQL支持使用批处理和交互式两种方法执行Spark SQL。选择的执行方式不同,调度的操作步骤也有所不同。详细步骤如下:
批处理
Spark Airflow Operator命令行工具
- 安装Airflow Spark插件。执行如下命令: - pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
- 创建Connection,示例如下: - { "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }- 参数说明: - 参数 - 说明 - auth_type - 认证方式,固定填写为AK,表示使用AK认证。 - access_key_id - 阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。 - 如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。 - access_key_secret - 阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。 - 如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。 - region - AnalyticDB for MySQL集群的地域ID。 
- 创建DAG声明Spark工作流,本文的DAG声明文件为 - spark_dags.py。- from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, ) as dag: spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_sql- 参数说明如下: - DAG配置参数- 参数 - 是否必填 - 说明 - dag_id - 是 - DAG的名称,您可以自定义。 - default_args - 是 - cluster_id:AnalyticDB for MySQL集群ID。 
- rg_name:AnalyticDB for MySQL集群Job型资源组名称。 
- region:AnalyticDB for MySQL集群的地域ID。 
 - 更多选填参数及说明,请参见DAG参数说明。 - AnalyticDBSparkSQLOperator配置参数- 参数 - 是否必填 - 说明 - task_id - 是 - 任务ID。 - SQL - 是 - Spark SQL语句。 - 更多选填参数及说明,请参见Airflow参数说明。 
- 将 - spark_dags.py文件存放至Airflow Configuration声明dags_folder所在的文件夹中。
- 执行DAG。具体操作请参见Airflow社区文档。 
Spark-Submit命令行工具
对于AnalyticDB for MySQL特有的配置项,例如clusterId、regionId、keyId和secretId,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。详情请参见Spark应用配置参数。
- 安装Airflow Spark插件。执行如下命令: - pip3 install apache-airflow-providers-apache-spark重要- 您需要使用Python3来安装Airflow Spark插件。 
- 安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要执行如下命令将pyspark卸载。 - pip3 uninstall pyspark
 
- 配置PATH路径。执行以下命令,将Spark-Submit命令行工具的地址加入Airflow执行地址。 - export PATH=$PATH:</your/adb/spark/path/bin>重要- 在启动Airflow之前需要将Spark-Submit加入到PATH中,否则调度任务可能会找不到Spark-Submit命令。 
- 准备DAG声明文件。本文以创建Airflow DAG的demo.py文件为例。 - from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job
- 将编辑完成的demo.py文件放至Airflow安装目录的dags目录下。 
- 执行DAG。具体操作请参见Airflow社区文档。 
交互式
- 获取Spark Interactive型资源组的连接地址。 - 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。 
- 在左侧导航栏,单击,单击资源组管理页签。 
- 单击对应资源组操作列的详情,查看内网连接地址和公网连接地址。您可单击端口号括号内的  按钮,复制连接地址。 按钮,复制连接地址。- 以下两种情况,您需要单击公网地址后的申请网络,手动申请公网连接地址。 - 提交Spark SQL作业的客户端工具部署在本地或外部服务器。 
- 提交Spark SQL作业的客户端工具部署在ECS上,且ECS与AnalyticDB for MySQL不属于同一VPC。 
 
 
- 安装apache-airflow-providers-apache-hive和apache-airflow-providers-common-sql依赖。 
- 访问Airflow Web界面,在顶部导航栏单击。 
- 单击  按钮,在Add Connections页面配置如下参数: 按钮,在Add Connections页面配置如下参数:- 参数 - 说明 - Connection Id - 连接名称。本文示例为 - adb_spark_cluster。- Connection Type - 选择Hive Server 2 Thrift。 - Host - 请填写步骤1中获取的连接地址。连接地址中的 - default需替换为实际的数据库名,并且需要删除连接地址中的- resource_group=<资源组名称>后缀。- 例如: - jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo。- Schema - 连接的数据库。本文示例为 - adb_demo。- Login - AnalyticDB for MySQL的数据库账号及Interactive型资源组名称。格式为 - 资源组名称/数据库账号名称。- 例如:本文示例资源组名称为spark_interactive_prod,数据库账号名称为spark_user,此处填写为 - spark_interactive_prod/spark_user。- Password - AnalyticDB for MySQL数据库账号的密码。 - Port - Spark Interactive型资源组的端口号,固定为10000。 - Extra - 认证方式,固定填写以下内容,表示使用用户名和密码认证。 - { "auth_mechanism": "CUSTOM" }
- 编写DAG文件。 - from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 2, 10), 'retries': 1, } dag = DAG( 'adb_spark_sql_test', default_args=default_args, schedule_interval='@daily', ) jdbc_query = SQLExecuteQueryOperator( task_id='execute_spark_sql_query', conn_id='adb_spark_cluster', sql='show databases', dag=dag ) jdbc_query- 参数说明: - 参数 - 是否必填 - 说明 - task_id - 是 - 任务ID。您可以自定义。 - conn_id - 是 - 连接名称。此处填写步骤4创建的Connection ID。 - sql - 是 - Spark SQL语句。 - 更多选填参数及说明,请参见Airflow参数说明。 
- 在Airflow Web界面,单击对应DAG右侧  按钮。 按钮。
调度Spark Jar作业
Spark Airflow Operator命令行工具
- 安装Airflow Spark插件。执行如下命令: - pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
- 创建Connection,示例如下: - { "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }- 参数说明: - 参数 - 说明 - auth_type - 认证方式,固定填写为AK,表示使用AK认证。 - access_key_id - 阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。 - 如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。 - access_key_secret - 阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。 - 如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。 - region - AnalyticDB for MySQL集群的地域ID。 
- 创建DAG声明Spark工作流,本文的DAG声明文件为 - spark_dags.py。- from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()- DAG配置参数- 参数 - 是否必填 - 说明 - dag_id - 是 - DAG的名称,您可以自定义。 - default_args - 是 - cluster_id:AnalyticDB for MySQL集群ID。 
- rg_name:AnalyticDB for MySQL集群Job型资源组名称。 
- region:AnalyticDB for MySQL集群的地域ID。 
 - 更多选填参数及说明,请参见DAG参数说明。 - AnalyticDBSparkBatchOperator配置参数- 参数 - 是否必填 - 说明 - task_id - 是 - 任务ID。 - file - 是 - Spark应用主文件的存储路径,文件路径需为绝对路径。主文件是入口类所在的JAR包或者Python的入口执行文件。 重要- Spark应用主文件目前只支持存储在OSS中。 - OSS Bucket与AnalyticDB for MySQL集群需要在同一地域。 - class_name - 条件必填 - Java或Scala程序入口类名称,必填参数。 
- Python不需要指定入口类,非必填参数。 
 - 更多选填参数及说明,请参见AnalyticDBSparkBatchOperator参数说明。 
- 将 - spark_dags.py文件存放至Airflow Configuration声明dags_folder所在的文件夹中。
- 执行DAG。具体操作请参见Airflow社区文档。 
Spark-Submit命令行工具
对于AnalyticDB for MySQL特有的配置项,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。详情请参见Spark应用配置参数。
- 安装Airflow Spark插件。执行如下命令: - pip3 install apache-airflow-providers-apache-spark重要- 您需要使用Python3来安装Airflow Spark插件。 
- 安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要执行如下命令将pyspark卸载。 - pip3 uninstall pyspark
 
- 配置PATH路径。执行以下命令,将Spark-Submit命令行工具的地址加入Airflow执行地址。 - export PATH=$PATH:</your/adb/spark/path/bin>重要- 在启动Airflow之前需要将Spark-Submit加入到PATH中,否则调度任务可能会找不到Spark-Submit命令。 
- 准备DAG声明文件。本文以创建Airflow DAG的demo.py文件为例。 - from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, max_active_runs=1, catchup=False, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()- 参数说明: - DAG配置参数- 参数 - 是否必填 - 说明 - dag_id - 是 - DAG的名称,您可以自定义。 - default_args - 是 - cluster_id:AnalyticDB for MySQL集群ID。 
- rg_name:AnalyticDB for MySQL集群Job型资源组名称。 
- region:AnalyticDB for MySQL集群的地域ID。 
 - 更多选填参数及说明,请参见DAG参数说明。 - AnalyticDBSparkBatchOperator配置参数- 参数 - 是否必填 - 说明 - task_id - 是 - 任务ID。 - file - 是 - Spark应用主文件的存储路径,文件路径需为绝对路径。主文件是入口类所在的JAR包或者Python的入口执行文件。 重要- Spark应用主文件目前只支持存储在OSS中。 - OSS Bucket与AnalyticDB for MySQL集群需要在同一地域。 - class_name - 条件必填 - Java或Scala程序入口类名称,必填参数。 
- Python不需要指定入口类,非必填参数。 
 - 更多选填参数及说明,请参见AnalyticDBSparkBatchOperator参数说明。 
- 将编辑完成的demo.py文件放至Airflow安装目录的dags目录下。 
- 执行DAG。具体操作请参见Airflow社区文档。