Airflow是比较流行的开源调度工具,可以实现各类工作负载的DAG编排与调度。您可以通过Spark-Submit和Spark-SQL命令行来实现Airflow调度Spark任务。DLA Spark提供了命令行工具包,支持通过Spark-Submit和Spark-SQL方式来提交Spark作业。您可以直接将开源Spark命令行工具包替换成DLA Spark命令行工具包,并进行简单的配置即可使用Airflow调度DLA Spark作业。
云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见Airflow调度Spark。
准备工作
安装Airflow服务。
安装Airflow服务并启动。具体操作请参见Airflow社区文档。
安装Airflow Spark插件。执行命令如下:
pip3 install apache-airflow-providers-apache-spark
说明您需要使用Python3来安装Airflow Spark插件。
安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要将其卸载,执行命令如下:
pip3 uninstall pyspark
下载DLA Spark命令行工具包并进行配置。
下载DLA Spark命令行工具包并进行配置。具体操作请参见Spark-Submit命令行工具。
配置PATH路径,执行命令如下:
export PATH=$PATH:/your/dla/spark/path/bin
说明在启动Airflow scheduler之前需要将Spark-Submit和Spark-SQL命令加入到PATH中,否则调度任务可能会找不到Spark-Submit和Spark-SQL命令。
操作步骤
编辑DLA Spark Airflow DAG的dla_spark_demo.py文件。如下所示:
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun DLA', } with DAG( dag_id='example_dla_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: dla_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium", "spark.sql.hive.metastore.version": "dla", "spark.dla.connectors": "oss", "spark.hadoop.job.oss.fileoutputcommitter.enable": "true" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf = dla_spark_conf, application="oss://your-bucket/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job
执行DLA Spark DAG。
将编辑完成的dla_spark_demo.py文件放到Airflow安装目录的dags目录下,然后执行DLA Spark DAG。具体操作请参见Airflow社区文档。
注意事项
DLA Spark的最小资源调度单元是容器,容器规格通过
resourceSpec
来定义。您可以通过配置spark.driver.resourceSpec
和spark.executor.resourceSpec
来指定driver和executor的容器规格。Hadoop社区通过指定driver和executor的CPU和Memory来申请资源。DLA Spark工具包兼容了Hadoop的资源配置能力,如果您指定了driver和executor的CPU和Memory,会被自动转换为大于所指定CPU和Memory的最小资源规格。例如,当executor_cores
=2、executor_memory
=5 G时,则会被转换为spark.executor.resourceSpec
=medium。对于DLA特有的一些参数,例如
vcName
、regionId
、keyId
、secretId
、ossUploadPath
,您可以在DLA Spark工具包的配置文件conf/spark-defaults.conf中进行配置,也可以通过Airflow参数来配置。由于DLA Spark访问DLA的元数据时,只支持外表,因此对于
SparkJDBCOperator
,cmd_type='jdbc_to_spark'
并且save_mode="overwrite"
的方式不支持。说明DLA Spark访问自建Hive集群的元数据时,不存在该问题。关于如何访问自建Hive元数据,请参见Hive。
如果您当前使用的是Airflow调度Livy的方式,目前还是需要改造成命令行的形式。DLA Spark团队正在开发Livy兼容版本,以降低迁移成本,具体请联系专家服务。