本文通过E-MapReduce(简称EMR)自带的教程,为您介绍如何快速使用Data Development集群。

前提条件

  • 已创建Data Development集群。

    创建集群详情,请参见创建集群

  • 安全组规则已开启8080、8081和8082端口。

    添加安全组规则详情,请参见添加安全组规则

使用限制

Data Development集群仅支持绑定到同一个VPC内的EMR集群,不支持跨VPC。

操作流程

  1. 步骤一:创建并添加自定义策略
  2. 步骤二:关联集群
  3. 步骤三:Notebook交互式开发
  4. 步骤四:编写Airflow Python脚本
  5. 步骤五:调度Notebook作业

步骤一:创建并添加自定义策略

创建并添加自定义策略,以便于关联集群。
注意 请使用阿里云账号执行以下操作。
  1. 使用阿里云账号登录RAM控制台
  2. 在左侧导航栏中,选择权限管理 > 权限策略管理
  3. 单击创建权限策略
  4. 新建自定义权限策略,进行如下配置。
    1. 输入策略名称DDCRolePolicy
    2. 单击脚本配置模式。
    3. 策略内容区域,拷贝如下内容。
      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "emr:ListClusterHost",
                      "emr:DescribeCluster",
                      "emr:DescribeClusterV2",
                      "emr:ListClusters",
                      "emr:DescribeFlowAgentToken"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      DDCRolePolicy
    4. 单击确定
  5. 添加自定义策略。
    1. 在左侧导航栏中,单击RAM角色管理
    2. RAM角色管理页面,搜索角色AliyunECSInstanceForEMRRole
      search_role
    3. 单击操作列的添加权限
      add_role
    4. 添加权限页面,单击自定义策略,搜索并选择DDCRolePolicy
      add-DDC
    5. 单击确定
  6. 单击完成

步骤二:关联集群

关联集群后,数据开发集群可以访问对应EMR集群上的资源并提交Job。

  1. 进入详情页面。
    1. 登录阿里云E-MapReduce控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击上方的集群管理页签。
    4. 集群管理页面,单击相应集群所在行的详情
  2. 在左侧导航栏中,选择集群服务 > Data Development Center
  3. 进入关联集群页面。
    1. 单击快捷链接后的select图标。
    2. 单击链接Data Development Center
      即可进入关联集群页面。data_development_page
  4. 关联集群页面,单击可关联集群
    说明 仅显示同一个VPC下的EMR集群。
  5. 选择待关联集群的集群类型。
  6. 单击待关联集群操作列的关联集群
  7. 关联集群页面,单击绑定
    已关联集群页面,显示关联的集群信息时,表示关联成功。
    说明 绑定集群过程大约需要1~2分钟,请您耐心等待。

步骤三:Notebook交互式开发

关联集群后,您可以在Zeppelin Notebook中进行交互式开发。EMR数据开发集群自带教程,本文以Airflow调度教程1为例介绍。

  1. 在左侧导航栏中,单击Zepplin
  2. Zepplin页面,选择阿里云EMR数据开发教程 > Airflow教程 > Airflow调度教程1
    页面展示如下图所示。Airflow1
    区域 描述
    Markdown语言(以%md开头),主要是为了添加一些说明文字。
    Shell脚本语言(以%sh开头),您可以运行Shell脚本,生成数据。
    Hive(以%hive开头),您可以执行任意的Hive SQL语句。
    单击drop-down图标,可以切换关联的集群。
    单击run图标,运行当前段落。
    说明 部分解释器(Interpreter)在第一次使用的时候会去下载依赖,如果提示信息类似Interpreter Setting 'hive' is not ready,说明下载过程还没结束,请稍后重试。

步骤四:编写Airflow Python脚本

Airflow的调度需要手动编写Python脚本来构建DAG,在Airflow调度教程1的最下面有关于这个Note的DAG脚本。Airflow的基本用法,请参见Apache AirflowAirflow dag

E-MapReduce已经在创建集群的时候自动上传DAG对应的Python脚本到OSS,因此,您只需要在左侧导航栏中,单击Airflow,即可进入DAGs页面。

EMR数据开发里引入了一个新的Operator(ZeppelinOperator) 。您可以使用ZeppelinOperator调用Zeppelin Notebook,主要包含Note和Paragraph两种级别调度方式。

本文以Paragraph级别调度为例,为每个Paragraph构建一个Task,然后串联起来构成一个链式DAG。代码示例如下:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.zeppelin_operator import ZeppelinOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}


with DAG('zeppelin_example_paragraph_dag',
         max_active_runs=5,
         schedule_interval='0 0 * * *',
         default_args=default_args) as dag:

    execution_date = "{{ ds }}"

    raw_data_task = ZeppelinOperator(
        task_id='raw_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613703810025_1489985581',
        params= {'dt' : execution_date}
    )

    csv_data_task = ZeppelinOperator(
        task_id='csv_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613705325984_828524031',
        params= {'dt' : execution_date}
    )

    parquet_data_task = ZeppelinOperator(
        task_id='parquet_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613709950691_1420422787',
        params= {'dt' : execution_date}
    )

    query_task = ZeppelinOperator(
        task_id='query_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613703819960_454383326',
        params= {'dt' : execution_date}
    )

    raw_data_task >> csv_data_task >> parquet_data_task >> query_task
 

步骤五:调度Notebook作业

  • 启用DAG调度
    DAGs页面,打开off开关,即可启用DAG调度。DAG
  • 查看调度任务的详细信息

    DAGs页面,单击待查看调度任务的DAG,进入Tree View页面。

    Tree View页面,您可以查看对应DAG运行调度任务的详细信息。DAG_info
  • Task的运行情况

    Tree View页面,单击task图标,即可查看对应Task的运行情况。

    详细信息如下图所示。task_log