Airflow调度DLA Spark作业

Airflow是比较流行的开源调度工具,可以实现各类工作负载的DAG编排与调度。您可以通过Spark-Submit和Spark-SQL命令行来实现Airflow调度Spark任务。DLA Spark提供了命令行工具包,支持通过Spark-Submit和Spark-SQL方式来提交Spark作业。您可以直接将开源Spark命令行工具包替换成DLA Spark命令行工具包,并进行简单的配置即可使用Airflow调度DLA Spark作业。

重要

云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见Airflow调度Spark

准备工作

  • 安装Airflow服务。

    1. 安装Airflow服务并启动。具体操作请参见Airflow社区文档

    2. 安装Airflow Spark插件。执行命令如下:

      pip3 install apache-airflow-providers-apache-spark
      说明
      • 您需要使用Python3来安装Airflow Spark插件。

      • 安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要将其卸载,执行命令如下:

        pip3 uninstall pyspark
  • 下载DLA Spark命令行工具包并进行配置。

    1. 下载DLA Spark命令行工具包并进行配置。具体操作请参见Spark-Submit命令行工具

    2. 配置PATH路径,执行命令如下:

      export PATH=$PATH:/your/dla/spark/path/bin
      说明

      在启动Airflow scheduler之前需要将Spark-Submit和Spark-SQL命令加入到PATH中,否则调度任务可能会找不到Spark-Submit和Spark-SQL命令。

操作步骤

  1. 编辑DLA Spark Airflow DAG的dla_spark_demo.py文件。如下所示:

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'Aliyun DLA',
    }
    
    with DAG(
        dag_id='example_dla_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        dla_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.sql.hive.metastore.version": "dla",
            "spark.dla.connectors": "oss",
            "spark.hadoop.job.oss.fileoutputcommitter.enable": "true"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf = dla_spark_conf,
            application="oss://your-bucket/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
    
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
    
        submit_job >> sql_job
  2. 执行DLA Spark DAG。

    将编辑完成的dla_spark_demo.py文件放到Airflow安装目录的dags目录下,然后执行DLA Spark DAG。具体操作请参见Airflow社区文档

注意事项

  • DLA Spark的最小资源调度单元是容器,容器规格通过resourceSpec来定义。您可以通过配置spark.driver.resourceSpecspark.executor.resourceSpec来指定driver和executor的容器规格。Hadoop社区通过指定driver和executor的CPU和Memory来申请资源。DLA Spark工具包兼容了Hadoop的资源配置能力,如果您指定了driver和executor的CPU和Memory,会被自动转换为大于所指定CPU和Memory的最小资源规格。例如,当executor_cores=2、executor_memory=5 G时,则会被转换为spark.executor.resourceSpec=medium。

  • 对于DLA特有的一些参数,例如vcNameregionIdkeyIdsecretIdossUploadPath,您可以在DLA Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。

  • 由于DLA Spark访问DLA的元数据时,只支持外表,因此对于SparkJDBCOperatorcmd_type='jdbc_to_spark'并且save_mode="overwrite"的方式不支持。

    说明

    DLA Spark访问自建Hive集群的元数据时,不存在该问题。关于如何访问自建Hive元数据,请参见Hive

  • 如果您当前使用的是Airflow调度Livy的方式,目前还是需要改造成命令行的形式。DLA Spark团队正在开发Livy兼容版本,以降低迁移成本,具体请联系专家服务