本文为您介绍如何通过集群模板创建待关联的Hadoop集群,该集群在调度完成后会自动释放。

前提条件

  • 已创建Data Development集群。

    创建集群详情,请参见创建集群

  • 安全组规则已开启8080、8081和8082端口。

    添加安全组规则详情,请参见添加安全组规则

  • 已修改策略内容,详情请参见修改策略内容

操作步骤

  1. 创建Hadoop集群模板,详情请参见创建集群模板
  2. 编写Airflow DAG脚本。
    配置 zeppelin_etl_note_dag脚本,示例代码如下:
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.contrib.operators.zeppelin_operator import ZeppelinOperator
    from airflow.utils.dates import days_ago
    from airflow.contrib.operators.aliyun_emr_operator import TearDownClusterOperator
    from airflow.contrib.operators.aliyun_emr_operator import CreateClusterFromTemplateOperator
    
    default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': datetime(2019, 4, 9),
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 0,
            'retry_delay': timedelta(minutes=500),
    }
    
    
    with DAG('zeppelin_etl_note_dag_new',
                     max_active_runs=5,
                     schedule_interval='0 0 * * *',
                     default_args=default_args) as dag:
    
        execution_date = "{{ ds }}"
    
        head = CreateClusterFromTemplateOperator(
            task_id='create_cluster',
            template_id ='CT-37377AD06E3B****',
        )
    
       raw_data_task = ZeppelinOperator(
            task_id='raw_data_task',
            conn_id='zeppelin_default',
            note_id='2FZWJTTPS',
            create_cluster_task_id='create_cluster',
            params= {'dt' : execution_date}
            )
    
        spark_etl_task = ZeppelinOperator(
            task_id='spark_etl_task',
            conn_id='zeppelin_default',
            note_id='2FX3GJW67',
            create_cluster_task_id='create_cluster',
            params= {'dt' : execution_date}
        )
    
        spark_query_task = ZeppelinOperator(
            task_id='spark_query_task',
            conn_id='zeppelin_default',
            note_id='2FZ8H4JPV',
            create_cluster_task_id='create_cluster',
            params= {'dt' : execution_date}
        )
    
    
        tail = TearDownClusterOperator(
            task_id='tear_down_cluster',
            create_cluster_task_id='create_cluster',
        )
    
    
        head >> raw_data_task >> spark_etl_task >> spark_query_task >> tail
    说明 template_id的值为您 步骤1中创建模板集群的ID,并且 ZeppelinOperatorTearDownClusterOperator中的 create_cluster_task_id需要和 CreateClusterFromTemplateOperator中的 task_id保持一致,参数的详情信息请参见 背景信息
  3. 上传Airflow DAG脚本。
    1. 登录OSS管理控制台
    2. 上传Airflow DAG脚本,详情请参见上传文件
  4. 启用DAG脚本。
    DAGs页面,打开待启用DAG脚本所在行的 off开关,即可启用DAG调度。 DAG
    说明 上传Airflow DAG脚本至OSS后,在DAGs页面并不能立刻看到上传的DAG,大约需要1~2分钟,请您耐心等待。
  5. 查看DAG运行状态。
    1. DAGs页面,单击待查看调度任务的DAG。
      进入 Tree View页面。
    2. Tree View页面,您可以查看对应DAG运行调度任务的详细信息。
    3. Tree View页面,单击task图标。
    4. 单击Task对话框中的View log
      即可查看Task详情的运行情况。 create_cluster
      说明 通过Log信息您可以看到创建已关联的集群信息,集群调度完成后会被释放掉。