本文为您介绍如何通过EMR集群的模板功能为EMR Studio动态拉起计算集群来运行工作流,该计算集群会在调度完成后自动释放。

前提条件

  • 已创建EMR Studio集群。

    创建集群详情,请参见创建集群

  • 安全组规则已开启8000、8081和8443端口。

    添加安全组规则,详情请参见添加安全组规则

操作步骤

  1. 创建EMR集群模板,详情请参见创建集群模板
  2. 编写Airflow DAG脚本。
    配置zeppelin_etl_note_dag脚本,示例代码如下。
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.providers.apache.zeppelin.operators.zeppelin_operator import ZeppelinOperator
    from airflow.utils.dates import days_ago
    from airflow.contrib.operators.aliyun_emr_operator import TearDownClusterOperator
    from airflow.contrib.operators.aliyun_emr_operator import CreateClusterFromTemplateOperator
    
    default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': datetime(2019, 4, 9),
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 0,
            'retry_delay': timedelta(minutes=500),
    }
    
    
    with DAG('zeppelin_etl_note_dag_new',
                     max_active_runs=5,
                     schedule_interval='0 0 * * *',
                     default_args=default_args) as dag:
    
        execution_date = "{{ ds }}"
    
        head = CreateClusterFromTemplateOperator(
            task_id='create_cluster',
            template_id ='CT-37377AD06E3B****',
        )
    
       raw_data_task = ZeppelinOperator(
            task_id='raw_data_task',
            conn_id='zeppelin_default',
            note_id='2FZWJTTPS',
            create_cluster_task_id='create_cluster',
            params= {'dt' : execution_date}
            )
    
        spark_etl_task = ZeppelinOperator(
            task_id='spark_etl_task',
            conn_id='zeppelin_default',
            note_id='2FX3GJW67',
            create_cluster_task_id='create_cluster',
            params= {'dt' : execution_date}
        )
    
        spark_query_task = ZeppelinOperator(
            task_id='spark_query_task',
            conn_id='zeppelin_default',
            note_id='2FZ8H4JPV',
            create_cluster_task_id='create_cluster',
            params= {'dt' : execution_date}
        )
    
    
        tail = TearDownClusterOperator(
            task_id='tear_down_cluster',
            create_cluster_task_id='create_cluster',
        )
    
    
        head >> raw_data_task >> spark_etl_task >> spark_query_task >> tail
    说明 template_id的值为前一个步骤中创建模板集群的ID,并且ZeppelinOperatorTearDownClusterOperator中的create_cluster_task_id需要和CreateClusterFromTemplateOperator中的task_id保持一致。
  3. 上传Airflow DAG脚本。
    1. 登录 OSS管理控制台
    2. 上传Airflow DAG脚本,详情请参见上传文件
  4. 启用DAG脚本。
    DAGs页面,打开待启用DAG脚本所在行的off开关,即可启用DAG调度。DAG
    说明 上传Airflow DAG脚本至OSS后,在DAGs页面并不能立刻看到上传的DAG,大约需要1~2分钟,请您耐心等待。
  5. 查看DAG运行状态。
    1. DAGs页面,单击待查看调度任务的DAG。
    2. Tree View页面,您可以查看对应DAG运行调度任务的详细信息。
    3. Tree View页面,单击task图标。
    4. 单击Task对话框中的View log
      即可查看Task详情的运行情况。create_cluster
      说明 通过Log信息您可以看到已关联的集群信息,调度完成后集群会被释放掉。