本文为您介绍如何通过阿里云E-MapReduce(简称EMR)控制台,快速创建EMR Studio集群并开展交互式开发和工作流调度工作。
背景信息
如果您想了解更多Zeppelin、Jupyter和Airflow的信息,请参见以下内容:
前提条件
- 已申请体验EMR Studio的资格。
- 已创建EMR Studio集群,详情请参见创建EMR Studio集群。
说明 如果您是第一次创建EMR Studio集群,在创建EMR Studio集群时,控制台会弹出系统角色授权窗口,请您使用主账号对系统角色AliyunECSInstanceForEMRStudioRole进行授权。角色授权详情请参见角色授权。
- 安全组规则已开启8000、8081和8443端口。
添加安全组规则,详情请参见添加安全组规则。
- 已添加用户,详情请参见添加用户。
使用限制
EMR Studio仅支持与同一VPC下的EMR计算集群进行关联。
操作流程
步骤一:进入数据开发工作台
步骤二:关联计算集群
重要 EMR Studio仅支持与同一VPC下的EMR计算集群进行关联。
关联集群后,EMR Studio集群可以访问对应EMR集群上的资源并提交Job。
步骤三:使用Zeppelin交互式开发作业
关联集群后,您可以在Zeppelin Notebook中进行交互式开发。EMR数据开发集群自带教程,本文以Airflow调度教程1为例介绍。
步骤四:编写Airflow Python脚本
Airflow的调度需要手动编写Python脚本来构建DAG,EMR Studio自动将指定OSS路径内的Python脚本同步至Airflow DAGs,因此,您可以在编辑和上传完DAG脚本之后,进入数据开发工作台,在左侧导航栏中,单击Airflow,即可进入DAGs页面查看您创建的DAG。
EMR Studio自带调度教程,您可以在Zeppelin页面,选择Apache Airflow。
查看。Airflow的基本用法,请参见说明 EMR Studio自带用于调度Zeppelin Notebook的Operator(ZeppelinOperator),并提供Note和Paragraph两种级别调度方式。更多信息,请参见定期调度Zeppelin中的作业。
本文以Paragraph级别调度为例,为每个Paragraph构建一个Task,然后串联起来构成一个链式DAG。代码示例如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.zeppelin.operators.zeppelin_operator import ZeppelinOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG('zeppelin_example_paragraph_dag',
max_active_runs=5,
schedule_interval='0 0 * * *',
default_args=default_args) as dag:
execution_date = "{{ ds }}"
raw_data_task = ZeppelinOperator(
task_id='raw_data_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613703810025_1489985581',
params= {'dt' : execution_date}
)
csv_data_task = ZeppelinOperator(
task_id='csv_data_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613705325984_828524031',
params= {'dt' : execution_date}
)
parquet_data_task = ZeppelinOperator(
task_id='parquet_data_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613709950691_1420422787',
params= {'dt' : execution_date}
)
query_task = ZeppelinOperator(
task_id='query_task',
conn_id='zeppelin_default',
note_id='2FXADMCY9',
paragraph_id='paragraph_1613703819960_454383326',
params= {'dt' : execution_date}
)
raw_data_task >> csv_data_task >> parquet_data_task >> query_task
步骤五:使用Airflow调度Notebook作业
- 启用DAG调度
在DAGs页面,打开off开关,即可启用DAG调度。
- 查看调度任务的详细信息
在DAGs页面,单击待查看调度任务的DAG,进入Tree View页面。
在Tree View页面,您可以查看对应DAG运行调度任务的详细信息。 - Task的运行情况
在Tree View页面,单击图标,即可查看对应Task的运行情况。
详细信息如下图所示。