本文为您介绍如何通过阿里云E-MapReduce(简称EMR)控制台,快速创建EMR Studio集群并开展交互式开发和工作流调度工作。
背景信息
如果您想了解更多Zeppelin、Jupyter和Airflow的信息,请参见以下内容:
前提条件
- 已申请体验EMR Studio的资格。
- 已创建EMR Studio集群,详情请参见创建EMR Studio集群。说明 如果您是第一次创建EMR Studio集群,在创建EMR Studio集群时,控制台会弹出系统角色授权窗口,请您使用主账号对系统角色AliyunECSInstanceForEMRStudioRole进行授权。角色授权详情请参见角色授权。
- 安全组规则已开启8000、8081和8443端口。
添加安全组规则,详情请参见添加安全组规则。
- 已添加用户,详情请参见添加用户。
使用限制
EMR Studio仅支持与同一VPC下的EMR计算集群进行关联。
操作流程
步骤一:进入数据开发工作台
- 进入详情页面。
- 登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的集群管理页签。
- 在集群管理页面,单击相应集群所在行的详情。
- 登录阿里云E-MapReduce控制台。
- 添加Linux用户。
您可以通过用户管理功能添加用户,详情请参见添加用户。
- 在左侧导航栏中,选择访问链接与端口。
- 单击Studio Workspace UI所在行的链接。
输入步骤2中添加的用户名和密码,即可正常访问Web UI页面。说明 请稍等几秒即可看到Studio Workspace UI的信息。
步骤二:关联计算集群
重要 EMR Studio仅支持与同一VPC下的EMR计算集群进行关联。
关联集群后,EMR Studio集群可以访问对应EMR集群上的资源并提交Job。
- 在关联集群页面,单击可关联集群页签。
说明 仅显示同一个VPC下的EMR集群。支持关联Hadoop集群、Dataflow集群和DataScience集群三种集群类型。
- 选择待关联集群的集群类型。
- 单击待关联集群操作列的关联集群。
- 在关联集群对话框中,单击绑定。
待已关联集群页签,显示关联的集群信息时,表示关联成功。说明 绑定集群过程大约需要1~2分钟,请您耐心等待。
步骤三:使用Zeppelin交互式开发作业
关联集群后,您可以在Zeppelin Notebook中进行交互式开发。EMR数据开发集群自带教程,本文以Airflow调度教程1为例介绍。
- 在左侧导航栏中,单击Zeppelin。
- 在Zeppelin页面,选择。
页面展示
步骤四:编写Airflow Python脚本
Airflow的调度需要手动编写Python脚本来构建DAG,EMR Studio自动将指定OSS路径内的Python脚本同步至Airflow DAGs,因此,您可以在编辑和上传完DAG脚本之后,进入数据开发工作台,在左侧导航栏中,单击Airflow,即可进入DAGs页面查看您创建的DAG。
EMR Studio自带调度教程,您可以在Zeppelin页面,选择查看。Airflow的基本用法,请参见Apache Airflow。
本文以Paragraph级别调度为例,为每个Paragraph构建一个Task,然后串联起来构成一个链式DAG。代码示例如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.zeppelin.operators.zeppelin_operator import ZeppelinOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG('zeppelin_example_paragraph_dag',
max_active_runs=5,
schedule_interval='0 0 * * *',
default_args=default_args) as dag:
execution_date = "{{ ds }}"
raw_data_task = ZeppelinOperator(
task_id='raw_data_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613703810025_1489985581',
params= {'dt' : execution_date}
)
csv_data_task = ZeppelinOperator(
task_id='csv_data_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613705325984_828524031',
params= {'dt' : execution_date}
)
parquet_data_task = ZeppelinOperator(
task_id='parquet_data_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613709950691_1420422787',
params= {'dt' : execution_date}
)
query_task = ZeppelinOperator(
task_id='query_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613703819960_454383326',
params= {'dt' : execution_date}
)
raw_data_task >> csv_data_task >> parquet_data_task >> query_task
步骤五:使用Airflow调度Notebook作业
- 启用DAG调度在DAGs页面,打开off开关,即可启用DAG调度。
- 查看调度任务的详细信息
在DAGs页面,单击待查看调度任务的DAG,进入Tree View页面。
在Tree View页面,您可以查看对应DAG运行调度任务的详细信息。 - Task的运行情况
在Tree View页面,单击
详细信息
该文章对您有帮助吗?