通过Apache Airflow使用Livy Operator提交任务

Apache Airflow是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过Apache Airflow的Livy Operator实现自动化地向EMR Serverless Spark提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。

背景信息

Apache Livy通过REST接口与Spark进行交互,极大简化了Spark和应用程序服务器之间的通信复杂度。关于Livy API,请参见REST API

前提条件

操作步骤

步骤一:创建Gateway及访问Token

  1. 创建Gateway。

    1. 进入Gateway页面。

      1. 登录E-MapReduce控制台

      2. 在左侧导航栏,选择EMR Serverless > Spark

      3. Spark页面,单击目标工作空间名称。

      4. EMR Serverless Spark页面,单击左侧导航栏中的运维中心 > Gateway

    2. Livy Gateway页面,单击创建Livy Gateway

    3. 在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建

      其余参数请根据具体情况进行调整,更多参数信息请参见管理Gateway

  2. 创建Token。

    1. Gateway页面,单击Livy-gateway操作列的Token管理

    2. 单击创建Token

    3. 创建Token对话框中,输入名称(例如,Livy-token),单击确定

    4. 复制Token信息。

      重要

      Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。

步骤二:配置Apache Airflow

  1. 执行以下命令,在Apache Airflow环境中安装Apache Livy。

    pip install apache-airflow-providers-apache-livy
  2. 添加Connection。

    UI方式

    在Airflow中找到默认为livy_default的Connection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection

    涉及以下信息:

    • Host:填写为Gateway中的Endpoint信息。

    • Schema:填写为https

    • Extra:填写JSON字符串,x-acs-spark-livy-token为您前一个步骤中复制的Token信息。

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    CLI方式

    通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint信息。
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # 为您前一个步骤中复制的Token信息。
            }
        }'

步骤三: 使用Livy Operator提交Spark任务

Airflow的DAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。

从阿里云OSS获取并执行Python脚本文件。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# 请根据实际情况替换file内容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task
说明

file为您的Spark任务对应的文件路径,本文示例为上传至阿里云OSS上的JAR包spark-examples_2.12-3.3.1.jar的路径,请您根据实际情况替换。上传操作可参见简单上传

步骤四:查看提交至EMR的任务

  1. EMR Serverless Spark页面,单击左侧导航栏中的任务历史

  2. 任务历史开发任务页签,您可以查看提交的任务。

    image

相关文档

在Apache Airflow中,您也可以选择使用EMR提供的EmrServerlessSparkStartJobRunOperator接口来提交EMR Serverless Spark任务,提供了一种除了Livy之外的便捷途径。更多详情,请参见通过Apache Airflow向EMR Serverless Spark提交任务