本文为您介绍如何通过阿里云E-MapReduce(简称EMR)控制台,快速创建EMR Studio集群并开展交互式开发和工作流调度工作。

背景信息

如果您想了解更多Zeppelin、Jupyter和Airflow的信息,请参见以下内容:

前提条件

  • 已申请体验EMR Studio的资格。
  • 已创建EMR Studio集群,详情请参见创建EMR Studio集群
    说明 如果您是第一次创建EMR Studio集群,在创建EMR Studio集群时,控制台会弹出系统角色授权窗口,请您使用主账号对系统角色AliyunECSInstanceForEMRStudioRole进行授权。角色授权详情请参见角色授权
  • 安全组规则已开启8000、80818443端口。

    添加安全组规则,详情请参见添加安全组规则

  • 已添加用户,详情请参见添加用户

使用限制

EMR Studio仅支持与同一VPC下的EMR计算集群进行关联。

操作流程

  1. 步骤一:进入数据开发工作台
  2. 步骤二:关联计算集群
  3. 步骤三:使用Zeppelin交互式开发作业
  4. 步骤四:编写Airflow Python脚本
  5. 步骤五:使用Airflow调度Notebook作业

步骤一:进入数据开发工作台

  1. 进入详情页面。
    1. 登录阿里云E-MapReduce控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击上方的集群管理页签。
    4. 集群管理页面,单击相应集群所在行的详情
  2. 添加Linux用户。
    您可以通过用户管理功能添加用户,详情请参见添加用户
  3. 在左侧导航栏中,选择访问链接与端口
  4. 单击Studio Workspace UI所在行的链接。
    输入步骤2中添加的用户名和密码,即可正常访问Web UI页面。
    说明 请稍等几秒即可看到Studio Workspace UI的信息。

步骤二:关联计算集群

重要 EMR Studio仅支持与同一VPC下的EMR计算集群进行关联。

关联集群后,EMR Studio集群可以访问对应EMR集群上的资源并提交Job。

  1. 关联集群页面,单击可关联集群页签。
    说明 仅显示同一个VPC下的EMR集群。支持关联Hadoop集群、Dataflow集群和DataScience集群三种集群类型。
  2. 选择待关联集群的集群类型。
  3. 单击待关联集群操作列的关联集群
  4. 关联集群对话框中,单击绑定
    已关联集群页签,显示关联的集群信息时,表示关联成功。
    说明 绑定集群过程大约需要1~2分钟,请您耐心等待。

步骤三:使用Zeppelin交互式开发作业

关联集群后,您可以在Zeppelin Notebook中进行交互式开发。EMR数据开发集群自带教程,本文以Airflow调度教程1为例介绍。

  1. 在左侧导航栏中,单击Zeppelin
  2. Zeppelin页面,选择阿里云EMR数据开发教程 > Airflow教程 > Airflow调度教程1
    页面展示如下图所示。Airflow1
    区域 描述
    Markdown语言(以%md开头),主要是为了添加一些说明文字。
    Shell脚本语言(以%sh开头),您可以运行Shell脚本,生成数据。
    Hive(以%hive开头),您可以执行任意的Hive SQL语句。
    单击drop-down图标,可以切换关联的集群。
    单击run图标,运行当前段落。
    说明 部分解释器(Interpreter)在第一次使用的时候会去下载依赖,如果提示信息类似Interpreter Setting 'hive' is not ready,说明下载过程还没结束,请稍后重试。

步骤四:编写Airflow Python脚本

Airflow的调度需要手动编写Python脚本来构建DAG,EMR Studio自动将指定OSS路径内的Python脚本同步至Airflow DAGs,因此,您可以在编辑和上传完DAG脚本之后,进入数据开发工作台,在左侧导航栏中,单击Airflow,即可进入DAGs页面查看您创建的DAG。

EMR Studio自带调度教程,您可以在Zeppelin页面,选择阿里云EMR数据开发教程 > Airflow教程 > Airflow调度教程1查看。Airflow的基本用法,请参见Apache AirflowAirflow dag
说明 EMR Studio自带用于调度Zeppelin Notebook的Operator(ZeppelinOperator),并提供Note和Paragraph两种级别调度方式。更多信息,请参见定期调度Zeppelin中的作业
本文以Paragraph级别调度为例,为每个Paragraph构建一个Task,然后串联起来构成一个链式DAG。代码示例如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.zeppelin.operators.zeppelin_operator import ZeppelinOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}


with DAG('zeppelin_example_paragraph_dag',
         max_active_runs=5,
         schedule_interval='0 0 * * *',
         default_args=default_args) as dag:

    execution_date = "{{ ds }}"

    raw_data_task = ZeppelinOperator(
        task_id='raw_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613703810025_1489985581',
        params= {'dt' : execution_date}
    )

    csv_data_task = ZeppelinOperator(
        task_id='csv_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613705325984_828524031',
        params= {'dt' : execution_date}
    )

    parquet_data_task = ZeppelinOperator(
        task_id='parquet_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613709950691_1420422787',
        params= {'dt' : execution_date}
    )

    query_task = ZeppelinOperator(
        task_id='query_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613703819960_454383326',
        params= {'dt' : execution_date}
    )

    raw_data_task >> csv_data_task >> parquet_data_task >> query_task
 

步骤五:使用Airflow调度Notebook作业

  • 启用DAG调度
    DAGs页面,打开off开关,即可启用DAG调度。DAG
  • 查看调度任务的详细信息

    DAGs页面,单击待查看调度任务的DAG,进入Tree View页面。

    Tree View页面,您可以查看对应DAG运行调度任务的详细信息。DAG_info
  • Task的运行情况

    Tree View页面,单击task图标,即可查看对应Task的运行情况。

    详细信息如下图所示。task_log