本文为您介绍如何通过EMR集群的模板功能为EMR Studio动态拉起计算集群来运行工作流,该计算集群会在调度完成后自动释放。
前提条件
- 已创建EMR Studio集群。
创建集群详情,请参见创建集群。
- 安全组规则已开启8000、8081和8443端口。
添加安全组规则,详情请参见添加安全组规则。
操作步骤
- 创建EMR集群模板,详情请参见创建集群模板。
- 编写Airflow DAG脚本。
配置zeppelin_etl_note_dag脚本,示例代码如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.zeppelin.operators.zeppelin_operator import ZeppelinOperator
from airflow.utils.dates import days_ago
from airflow.contrib.operators.aliyun_emr_operator import TearDownClusterOperator
from airflow.contrib.operators.aliyun_emr_operator import CreateClusterFromTemplateOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 4, 9),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=500),
}
with DAG('zeppelin_etl_note_dag_new',
max_active_runs=5,
schedule_interval='0 0 * * *',
default_args=default_args) as dag:
execution_date = "{{ ds }}"
head = CreateClusterFromTemplateOperator(
task_id='create_cluster',
template_id ='CT-37377AD06E3B****',
)
raw_data_task = ZeppelinOperator(
task_id='raw_data_task',
conn_id='zeppelin_default',
note_id='2FZWJTTPS',
create_cluster_task_id='create_cluster',
params= {'dt' : execution_date}
)
spark_etl_task = ZeppelinOperator(
task_id='spark_etl_task',
conn_id='zeppelin_default',
note_id='2FX3GJW67',
create_cluster_task_id='create_cluster',
params= {'dt' : execution_date}
)
spark_query_task = ZeppelinOperator(
task_id='spark_query_task',
conn_id='zeppelin_default',
note_id='2FZ8H4JPV',
create_cluster_task_id='create_cluster',
params= {'dt' : execution_date}
)
tail = TearDownClusterOperator(
task_id='tear_down_cluster',
create_cluster_task_id='create_cluster',
)
head >> raw_data_task >> spark_etl_task >> spark_query_task >> tail
说明 template_id的值为前一个步骤中创建模板集群的ID,并且ZeppelinOperator和TearDownClusterOperator中的create_cluster_task_id需要和CreateClusterFromTemplateOperator中的task_id保持一致。
- 上传Airflow DAG脚本。
- 登录 OSS管理控制台。
- 上传Airflow DAG脚本,详情请参见上传文件。
- 启用DAG脚本。
在
DAGs页面,打开待启用DAG脚本所在行的
off开关,即可启用DAG调度。
说明 上传Airflow DAG脚本至OSS后,在DAGs页面并不能立刻看到上传的DAG,大约需要1~2分钟,请您耐心等待。
- 查看DAG运行状态。
- 在DAGs页面,单击待查看调度任务的DAG。
- 在Tree View页面,您可以查看对应DAG运行调度任务的详细信息。
- 在Tree View页面,单击图标。
- 单击Task对话框中的View log。
即可查看Task详情的运行情况。
说明 通过Log信息您可以看到已关联的集群信息,调度完成后集群会被释放掉。