本文通过示例为您介绍如何使用OSS Sensor、WebHDFS Sensor、Spark Operator、Hive Operator、Bash Operator和给DAG配置告警。
背景信息
前提条件
- 已创建EMR Studio集群。
创建集群详情,请参见创建集群。
- 已绑定计算集群。
重要
- 绑定集群页签下,仅显示同一个VPC下的EMR集群。
- 仅支持绑定Hadoop集群。
使用OSS Senser
- 使用场景:任务B需要从OSS中读取文件data.txt,但文件data.txt是通过其他程序上传的,并且不确定具体上传时间。执行任务B前需要判断OSS中是否已经存在文件data.txt,此时您可以使用OSS Sensor,建立任务B依赖于Sensor A的依赖关系,如果Sensor A检测到OSS中存在data.txt文件,则执行任务B。
- 示例:本示例中Sensor1先检测OSS中的目录airflow/dags/Aliyun_Example_Dags/下是否存在zeppelin_etl_note.py文件,如果存在,则会继续执行下一个任务Sensor2,然后检测OSS中的airflow/dags/Aliyun_Example_Dags/目录下是否有文件名与zeppelin_**_not?.py相匹配的文件,如果存在,则会继续执行最后一个任务。如果Sensor检测到相应的文件在OSS中不存在,则会按照设定的参数,每间隔一段时间检测一次,判断目标文件有没有加入到OSS中。
from datetime import timedelta # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from airflow.contrib.sensors.aliyun_oss_sensor import AliyunOssKeySensor # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } dag = DAG( 'example_oss_sensor', default_args=default_args, description='a dag used to test oss sensor', schedule_interval=timedelta(days=1), ) sensor1 = AliyunOssKeySensor( task_id='detect_target_key_1', # bucket_name是目标文件所在OSS的名称。 bucket_name='chufeng-oss-ddc-hz', # bucket_key是目标文件在OSS中的完整路径。 bucket_key='airflow/dags/Aliyun_Example_Dags/zeppelin_etl_note.py', dag=dag, ) # wildcard_match设置为True使用通配符匹配(wildcard match),可以在OSS Bucket中对bucket_key进行模糊搜索。 # 星号(*)匹配单个或多个字符,问号(?)匹配单个字符。 sensor2 = AliyunOssKeySensor( task_id='detect_target_key_2', bucket_name='chufeng-oss-ddc-hz', # 星号(*)匹配单个或多个字符,问号(?)匹配单个字符。 bucket_key='airflow/dags/Aliyun_Example_Dags/zeppelin_**_not?.py', # 默认值为False,如果进行通配符匹配(wildcard match),则需要设为True。 wildcard_match=True, dag=dag, ) task = BashOperator( task_id='print_date', bash_command='date', dag=dag, ) sensor1 >> sensor2 >> task
使用WebHDFS Sensor
- 使用场景:任务B需要从HDFS中读取文件a.txt,但文件a.txt是通过其他程序上传的,并且不确定具体上传时间。执行任务B前需要判断HDFS中是否已经存在文件a.txt,此时您可以使用WebHDFS Sensor,建立任务B依赖于Sensor A的依赖关系。如果Sensor A检测到HDFS中存在a.txt文件,则执行任务B。
- 示例:本示例中Sensor1先检测HDFS中是否存在a.txt文件,如果存在,则会继续执行下一个任务;如果不存在,则会按照设定的参数,每间隔一段时间检测一次,判断目标文件有没有加入到HDFS中。
from datetime import timedelta # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash_operator import BashOperator from airflow.sensors.hdfs_sensor import HdfsSensor from airflow.sensors.web_hdfs_sensor import WebHdfsSensor from airflow.utils.dates import days_ago # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization # 需要关联Hadoop集群,否则可能会执行失败。 # 优先使用Sensor中指定的cluster_id,如果Sensor中未指定,则使用在DAG的default_args中指定的cluster_id。 default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'cluster_id': "C-8A9CAA9E4440****", # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } dag = DAG( 'example_web_hdfs_sensor', default_args=default_args, description='a dag used to test webhdfs sensor', schedule_interval=timedelta(days=1), ) sensor1 = WebHdfsSensor( # filepath filepath="/a.txt", # cluster_id="C-8A9CAA9E4440****", task_id='detect_target_key_1', dag=dag, ) task = BashOperator( task_id='print_date', bash_command='date', dag=dag, ) sensor1 >> task
说明 Airflow所有Sensor中都有两个默认参数poke_interval和timeout,均以秒为单位。poke_interval默认值是60(1分钟),timeout默认值是60*60*24*7(1周)。如果需要改变poke_interval,则建议设定值不要低于60,以免频繁请求造成效率下降。
使用Spark Operator
- Spark Operator必须指定cluster_id。
优先使用Operator中指定的cluster_id,如果未指定,则使用DAG中default_args参数指定的cluster_id。
- Spark Operator所需文件建议上传至挂载的OSS路径,并在代码中通过变量oss_mount_folder使用。
Operator可能会执行在EMR Studio集群的任意worker节点上,因此需要提交的JAR或文件等,建议上传至创建集群时所设置的数据开发存储的OSS路径下,可以避免同步数据的操作。该路径的Bucket会挂载至EMR Studio集群中所有主机的/mnt/jfs/下。上传完成后,可以在Operator中通过oss_mount_folder使用,例如,实例代码。oss_mount_folder为EMR Studio Airflow中默认已提供的变量,值为数据开发存储的OSS路径所映射的主机的本地路径。
- 使用场景:Spark相关的两个Operator,SparkSubmitOperator用于提交JAR或Python文件,SparkSqlOperator用于提交spark-sql。
- 示例:首先需要通过spark-submit运行SparkPi样例,然后通过Spark-sql创建表并插入数据,最后使用自定义函数UDF。
from datetime import timedelta from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator from airflow.models import DAG from airflow.models import Variable from airflow.utils.dates import days_ago args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'cluster_id': "C-8A9CAA9E4440****", } dag = DAG( dag_id="example_spark_operator", default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60), tags=['example'], ) oss_mount_folder = Variable.get('oss_mount_folder',"") run_first = SparkSubmitOperator( task_id='run_first', dag=dag, application=oss_mount_folder + '/example/spark-examples_2.11-2.4.7.jar', java_class='org.apache.spark.examples.SparkPi', name='airflow-spark-submit', conf={'spark.memory.storageFraction':0.6}, executor_cores= 1, executor_memory='1g', driver_memory='1g', # cluster_id="C-6394A97DE6D7****", # conn_id='spark_default', # files=None, # py_files=None, # archives=None, # driver_class_path=None, # jars=None, # packages=None, # exclude_packages=None, # repositories=None, # total_executor_cores=None, # keytab=None, # principal=None, # proxy_user=None, # num_executors=None, # status_poll_interval=1, # application_args=None, # env_vars=None, # verbose=False, # spark_binary=None, # cluster_id=None, ) run_second = SparkSqlOperator( task_id='run_second', dag=dag, sql=''' create database if not exists airflow; use airflow; drop table if exists test_sparksql; create table test_sparksql(name string); insert into test_sparksql values('studio'); ''', name='airflow-spark-sql-1', conf='spark.sql.shuffle.partitions=200' # conn_id='spark_default', # total_executor_cores=None, # executor_cores=None, # executor_memory=None, # keytab=None, # principal=None, # master='yarn', # num_executors=None, # verbose=True, # yarn_queue='default', # cluster_id=None, ) run_third = SparkSqlOperator( task_id='run_third', dag=dag, sql=''' use airflow; add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar; create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample'; show functions like '*udf'; select simpleudf(name) from test_sparksql; ''', name='airflow-spark-sql-2', conf='spark.sql.shuffle.partitions=200' ) run_first >> run_second >> run_third
使用Hive Operator
- Hive Operator必须指定cluster_id。
优先使用Operator中指定的cluster_id,如果未指定,则使用DAG中default_args参数指定的cluster_id。
- Operator所需文件建议上传至挂载的OSS路径。
- 使用场景:Hive Operator可用于提交Hive SQL。
- 示例:先创建表并插入数据,然后使用自定义函数UDF。
example_hive_operator.py文件详情如下。
from datetime import timedelta from airflow.models import DAG from airflow.operators.hive_operator import HiveOperator from airflow.utils.dates import days_ago args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'cluster_id': "C-8A9CAA9E4440****", } dag = DAG( dag_id="example_hive_operator", default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60), tags=['example'], ) run_first = HiveOperator( task_id='run_first', dag=dag, hql=''' create database if not exists airflow; use airflow; drop table if exists test_hive; create table test_hive(name string); insert into test_hive values('studio'); ''', hiveconfs={'hive.execution.engine': 'tez'}, mapred_job_name='airflow-hive-sql-1', # hive_cli_conn_id="hiveserver2_default", # cluster_id="C-8A9CAA9E4440****", 可以另指定集群,不指定时默认使用DAG的集群。 # mapred_queue=None, # mapred_queue_priority=None, # hiveconf_jinja_translate=False, # script_begin_tag=None, # run_as_owner=False, ) run_second = HiveOperator( task_id='run_second', dag=dag, hql=''' use airflow; add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar; create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample'; show functions like '*udf'; select simpleudf(name) from test_hive; ''', hiveconfs={'hive.execution.engine': 'tez'}, mapred_job_name='airflow-hive-sql-2', ) run_first >> run_second
使用Bash Operator
- Bash Operator如果调用Hadoop和HDFS等命令,则必须指定cluster_id。
优先使用Operator中指定的cluster_id,如果未指定,则使用DAG中default_args参数指定的cluster_id。
- Operator所需文件建议上传至挂载的OSS路径,并在代码中通过变量oss_mount_folder使用。
Operator可能会执行在EMR Studio集群的任意worker节点上,因此需要提交的JAR或文件等,建议上传至创建集群时所设置的数据开发存储的OSS路径下,可以避免同步数据的操作。该路径的Bucket会挂载至EMR Studio集群中所有主机的/mnt/jfs/下。上传完成后,可以在Operator中通过oss_mount_folder使用,例如,实例代码。oss_mount_folder为EMR Studio Airflow中默认已提供的变量,值为数据开发存储的OSS路径所映射的主机的本地路径。
- 使用场景:Bash Operator可用于提交bash命令。
- 示例:首先构造数据集上传至HDFS,然后提交MapReduce作业,最后查看HDFS上的结果。
example_bash_operator.py代码详情如下。
from builtins import range from datetime import timedelta from airflow.models import DAG from airflow.models import Variable from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'cluster_id': "C-8A9CAA9E4440****", } dag = DAG( dag_id='example_bash_operator', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60), tags=['example'], ) oss_mount_folder = Variable.get('oss_mount_folder') run_first = BashOperator( task_id='run_first', bash_command=r''' hdfs dfs -mkdir -p /tmp/wordcount/input && hdfs dfs -rm -r -f /tmp/wordcount/output && echo -e "1201\tGopal\t45000\tTechnical manager 1202\tManisha\t45000\tProof reader 1203\tMasthanvali\t40000\tTechnical writer 1204\tKiran\t40000\tHr Admin 1205\tKranthi\t30000\tOp Admin" > /tmp/sample.txt && hdfs dfs -put -f /tmp/sample.txt /tmp/wordcount/input ''', dag=dag, ) run_second = BashOperator( bash_command='hadoop jar {0}/example/hadoop-mapreduce-examples-3.2.1.jar wordcount /tmp/wordcount/input /tmp/wordcount/output'.format(oss_mount_folder), task_id='run_second', dag=dag, ) run_third = BashOperator( bash_command='hdfs dfs -cat /tmp/wordcount/output/part-r-00000', task_id='run_third', dag=dag, ) run_first >> run_second >> run_third
给DAG配置告警
- DAG失败告警示例(dag_fail_apm)
根据DAG失败告警级别的不同从airflow.contrib.operators.aliyun_apm_operator导入不同的callback告警函数,并传给DAG中的on_failure_callback函数。对于严重的失败事件,采用函数apm_alert_callback_dagrun_fail_critical。对于不严重的、警告类型的失败事件,采用函数apm_alert_callback_dagrun_fail_warning。
from airflow import DAG from datetime import datetime, timedelta from airflow.utils.dates import days_ago from airflow.operators.bash_operator import BashOperator # 导入告警callback函数。 from airflow.contrib.operators.aliyun_apm_operator import apm_alert_callback_dagrun_fail_critical default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), # 将告警callback函数传入DAG。 'on_failure_callback': apm_alert_callback_dagrun_fail_critical, } with DAG('dag_fail_apm', max_active_runs=5, schedule_interval='0 0 * * *', default_args=default_args) as dag: intentionally_failed_task = BashOperator( task_id='intentionally_failed_task', bash_command='echooooooooo make a mistake intentionally' ) intentionally_failed_task
- DAG SLA告警示例(dag_sla_breach_apm)
根据DAG SLA触发告警级别的不同从airflow.contrib.operators.aliyun_apm_operator导入不同的callback告警函数,并传给DAG中的sla_miss_callback函数。对于严重的SLA触发事件,采用apm_alert_callback_miss_sla_critical;对于不严重的、警告类型的SLA触发事件,采用apm_alert_callback_miss_sla_warning。
from datetime import timedelta # 导入SLA触发callback告警函数。 from airflow.contrib.operators.aliyun_apm_operator import apm_alert_callback_miss_sla_warning # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(3), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retry_delay': timedelta(seconds=20) # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'trigger_rule': 'all_success' } dag = DAG( 'dag_sla_breach_apm', default_args=default_args, description='breach sla intentionally', schedule_interval=timedelta(days=1), sla_miss_callback=apm_alert_callback_miss_sla_warning ) t2 = BashOperator( task_id='oversleep', depends_on_past=False, bash_command='sleep 10', # SLA设定1s,当命令sleep 10s时,必定触发SLA。 sla=timedelta(seconds=1), dag=dag ) t2