当您在Jupyter里完成作业开发后,您就可以通过Airflow定期调度作业。本文为您介绍如何将Jupyter中编写的Notebook,生成调度任务并定时执行。
前提条件
- 已创建EMR Studio集群。
创建集群详情,请参见创建集群。
- 安全组规则已开启8000、8081和8443端口。
添加安全组规则,详情请参见添加安全组规则。
- 已绑定计算集群。
注意
- 绑定集群页签下,仅显示同一个VPC下的EMR集群。
- 仅支持绑定Hadoop集群。
- 已添加用户,详情请参见添加用户。
- 已完成Notebook的编辑。您可以直接下载Notebook示例文件airflow_nb_example.ipynb至本地,然后上传至您的Jupyter中查看。
操作流程
步骤一:编写Airflow DAG脚本
示例代码如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.zeppelin_operator import ZeppelinOperator
from jupyter_operator import JupyterNotebookEmrOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1), # 第一次开始执行的时间。
'email_on_failure': False,
'email_on_retry': False,
'retries': 0, # 失败重试次数。
'retry_delay': timedelta(minutes=5), # 失败重试间隔。
}
with DAG('jupyter_airflow_example',
max_active_runs=1,
schedule_interval='0 0 * * *', # 执行周期,依次是分、时、周、月和年,此处表示每天0:00执行。
default_args=default_args) as dag:
execution_date = "{{ ds }}"
op = JupyterNotebookEmrOperator(
task_id='<yourTaskId>',
vcores=<yourVcore>,
memory='<yourMemory>',
job_name='<yourJobname>',
cluster_id='<yourClusterIP>',
notebook_path='<yourNotebookpath>',
username='<yourName>',
venv='<yourVENV>',
env={'VENV': '<yourENV>', 'SPARK_HOME': '/usr/lib/spark-current', 'HIVE_CONF_DIR': '/etc/ecm/hive-conf', 'PYSPARK_PYTHON': './env/bin/python'},
params={'<yourparams>': execution_date},
)
其中,涉及的参数信息如下。
参数 | 描述 |
---|---|
<yourTaskId> | Operator通用Task ID,在DAG文件内唯一,字符串类型。例如,jupyter_task。 |
<yourVcore> | 指定运行容器的虚拟核数,整数类型。例如,1。 |
<yourMemory> | 指定运行容器的内存,字符串类型。例如,4 G。 |
<yourJobname> | YARN作业名,字符串类型。例如,papermill。 |
<yourClusterIP> | 您关联的Hadoop集群的ID。 |
<yourName> | 登录JupyterHub的用户名。 |
<yourNotebookpath> | Notebook在OSS中的存储路径,只需填写相对路径即可。本文示例为spark_magic_example.ipynb。 |
<yourVENV> | 运行Jupyter时选择的虚拟环境,只需要填写虚拟环境名称即可,不带.tar.gz文件名后缀,字符串类型。例如,emr-jupyterenv。 |
<yourENV> | 指定容器环境变量,dict类型。 |
<yourparams> | Notebook参数,需要在Notebook的单元格上添加parameters标签。
标签详情,请参见Parameterize。 |
步骤二:上传Airflow DAG脚本
本文通过OSS控制台上传Airflow DAG脚本。上传Airflow DAG脚本的路径为您创建集群时指定的OSS路径。
步骤三:启用DAG脚本
在DAGs页面,打开待启用DAG脚本所在行的off开关,即启用了调度任务。
说明 上传Airflow DAG脚本至OSS后,在DAGs页面并不能立刻看到调度任务,大约需要1~2分钟,请您耐心等待。