通过Apache Airflow提交任务

更新时间:2025-04-16 09:49:11

Apache Airflow是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过Apache Airflow实现自动化地向EMR Serverless Spark提交任务,以实现作业调度和执行的自动化,帮助您更有效地管理数据处理任务。

背景信息

Apache Livy通过REST接口与Spark进行交互,极大简化了Spark和应用程序服务器之间的通信复杂度。关于Livy API,请参见REST API

在使用Apache Airflow提交任务时,您可以通过使用Livy OperatorEmrServerlessSparkStartJobRunOperator两种方式与Serverless Spark进行交互。请根据实际情况选择最适合的方案。

方式

适用场景

方式

适用场景

方式一:使用Livy Operator提交任务

如果您的环境已经部署了Livy服务,并且需要交互式查询能力,建议选择方式一。

方式二:使用EmrServerlessSparkStartJobRunOperator提交任务

EmrServerlessSparkStartJobRunOperator是由阿里云EMR Serverless Spark提供的一个组件,专门用于通过Airflow提交EMR Serverless Spark任务,与AirflowDAG机制深度集成,简化任务编排和调度。

如果您是新建项目或完全基于EMR Serverless Spark,建议选择方式二,以实现更高的性能和更低的运维复杂度。

前提条件

注意事项

当前EmrServerlessSparkStartJobRunOperator未输出实际作业的日志。如果您需要查看详细的作业日志,请登录EMR Serverless Spark控制台,通过任务运行ID找到对应的任务实例,然后,您可以在日志探查页签或者Spark UI中进一步检查和分析任务日志。

方式一:使用Livy Operator提交任务

步骤一:创建Livy Gateway及访问Token

  1. 创建并启动Gateway。

    1. 进入Gateway页面。

      1. 登录E-MapReduce控制台

      2. 在左侧导航栏,选择EMR Serverless > Spark

      3. Spark页面,单击目标工作空间名称。

      4. EMR Serverless Spark页面,单击左侧导航栏中的运维中心 > Gateway

    2. 单击Livy Gateway页签。

    3. Livy Gateway页面,单击创建Livy Gateway

    4. 在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建

      其余参数请根据具体情况进行调整,更多参数信息请参见管理Gateway

    5. Livy Gateway页面,单击已创建Gateway操作列的启动

  2. 创建Token。

    1. Gateway页面,单击Livy-gateway操作列的Token管理

    2. 单击创建Token

    3. 创建Token对话框中,输入名称(例如,Livy-token),单击确定

    4. 复制Token信息。

      重要

      Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。

步骤二:配置Apache Airflow

  1. 执行以下命令,在Apache Airflow环境中安装Apache Livy。

    pip install apache-airflow-providers-apache-livy
  2. 添加Connection。

    UI方式
    CLI方式

    Airflow中找到默认为livy_defaultConnection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection

    涉及以下信息:

    • Host:填写为Gateway中的Endpoint信息。

    • Schema:填写为https

    • Extra:填写JSON字符串,x-acs-spark-livy-token为您前一个步骤中复制的Token信息。

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint信息。
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # 为您前一个步骤中复制的Token信息。
            }
        }'

步骤三: 编写DAG并提交任务

AirflowDAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。

从阿里云OSS获取并执行Python脚本文件。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",  # DAG的唯一标识符。
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# 请根据实际情况替换file内容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task", 
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task

涉及参数如下表所示。

参数

描述

参数

描述

dag_id

DAG的唯一标识符,用于区分不同的DAG。

schedule_interval

定义DAG的调度间隔。None表示需要手动触发任务。

start_date

定义DAG的起始日期,表示从哪一天开始调度。

tags

DAG添加标签,便于分类和搜索。

catchup

控制是否补跑历史任务。如果为False,即使起始日期较早,也不会补跑未执行的任务。

file

为您的Spark任务对应的文件路径,本文示例为上传至阿里云OSS上的JAR包的路径,请您根据实际情况替换。上传操作可参见简单上传

class_name

指定JAR包中的主类名。

args

传递给Spark任务的命令行参数。

driver_memorydriver_cores

分别指定Driver的内存大小和核心数。

executor_memoryexecutor_cores

分别指定每个Executor的内存大小和核心数。

num_executors

指定Executor的数量。

name

Spark任务的名称。

task_id

Airflow任务的唯一标识符。

dag

将任务与DAG关联起来。

方式二:使用EmrServerlessSparkStartJobRunOperator提交任务

步骤一:配置Apache Airflow

  1. 下载airflow_alibaba_provider-0.0.3-py3-none-any.whl

  2. Airflow的每个节点上安装airflow-alibaba-provider插件。

    airflow-alibaba-provider插件是由EMR Serverless Spark团队提供,包含了一个专门用于提交EMR Serverless Spark任务的EmrServerlessSparkStartJobRunOperator组件。

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. 添加Connection。

    CLI方式
    UI方式

    通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection

    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",  #指定使用阿里云的AccessKey(AK)方式认证。
                "access_key_id": "<yourAccesskeyId>",  # 阿里云账号的AccessKey ID。
                "access_key_secret": "<yourAccesskeyKey>",  # 阿里云账号的AccessKey Secret。
                "region": "<yourRegion>"
            }
        }'

    通过在Airflow Web页面手动添加Connection,详情请参见创建Connection

    Add Connection页面,配置以下信息。

    image

    涉及参数如下表所示。

    参数

    说明

    Connection Id

    本文示例为emr-serverless-spark-id。

    Connection Type

    选择Generic。如果没有该类型,您也可以选择Email

    Extra

    填写内容如下。

    {
                "auth_type": "AK",  #指定使用阿里云的AccessKey(AK)方式认证。
                "access_key_id": "<yourAccesskeyId>",  # 阿里云账号的AccessKey ID。
                "access_key_secret": "<yourAccesskeyKey>",  # 阿里云账号的AccessKey Secret。
                "region": "<yourRegion>"
            }

步骤二:编写DAG并提交任务

AirflowDAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用EmrServerlessSparkStartJobRunOperator执行不同类型的Spark作业的示例。

提交JAR包
提交SQL文件
从OSS提交SQL文件
从OSS提交Python脚本

此场景涉及使用Airflow任务提交一个预编译的Spark JAR作业到阿里云EMR Serverless Spark。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_jar"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

Airflow DAG中直接执行SQL命令。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e","show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

从阿里云OSS获取并执行SQL脚本文件。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_sql_2"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

从阿里云OSS获取并执行Python脚本文件。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_python"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

涉及参数如下表所示。

参数

参数类型

描述

参数

参数类型

描述

task_id

str

指定Airflow任务的唯一标识符。

emr_serverless_spark_conn_id

str

指定Airflow用于连接EMR Serverless SparkConnection ID。

region

str

指定EMR Spark所处的区域。

polling_interval

int

设置Airflow轮询任务状态的时间间隔,单位为秒。

workspace_id

str

EMR Spark工作区的唯一标识符。

resource_queue_id

str

用于指定EMR Spark任务所使用资源队列的ID。

code_type

str

任务类型,可以是SQL、PythonJAR,根据任务类型,entry_point参数将有不同的含义。

name

str

EMR Spark任务的名称。

entry_point

str

指定启动任务的文件位置,例如JAR、SQLPython文件。根据code_type的不同,此参数代表的含义不同。

entry_point_args

List

传递给Spark程序的参数列表。

spark_submit_parameters

str

包含用于spark-submit命令的额外参数。

is_prod

bool

指定任务运行的环境。当设置为True时,则表明任务将在生产环境中执行,resource_queue_id应指定对应生产环境的资源队列ID,例如root_queue。

engine_release_version

str

设定EMR Spark引擎的版本。默认值是"esr-2.1-native",对应Spark版本为3.3.1Scala版本为2.12,使用原生运行时。

  • 本页导读 (1)
  • 背景信息
  • 前提条件
  • 注意事项
  • 方式一:使用Livy Operator提交任务
  • 步骤一:创建Livy Gateway及访问Token
  • 步骤二:配置Apache Airflow
  • 步骤三: 编写DAG并提交任务
  • 方式二:使用EmrServerlessSparkStartJobRunOperator提交任务
  • 步骤一:配置Apache Airflow
  • 步骤二:编写DAG并提交任务
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等