当您在Jupyter里完成作业开发后,您就可以通过Airflow定期调度作业。本文为您介绍如何将Jupyter中编写的Notebook,生成调度任务并定时执行。

前提条件

  • 已创建EMR Studio集群。

    创建集群详情,请参见创建集群

  • 安全组规则已开启8000、8081和8443端口。

    添加安全组规则,详情请参见添加安全组规则

  • 已绑定计算集群。
    注意
    • 绑定集群页签下,仅显示同一个VPC下的EMR集群。
    • 仅支持绑定Hadoop集群。
  • 已添加用户,详情请参见添加用户
  • 已完成Notebook的编辑。您可以直接下载Notebook示例文件airflow_nb_example.ipynb至本地,然后上传至您的Jupyter中查看。

操作流程

  1. 步骤一:编写Airflow DAG脚本
  2. 步骤二:上传Airflow DAG脚本
  3. 步骤三:启用DAG脚本

步骤一:编写Airflow DAG脚本

示例代码如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.zeppelin_operator import ZeppelinOperator
from jupyter_operator import JupyterNotebookEmrOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 8, 1), # 第一次开始执行的时间。
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0, # 失败重试次数。
    'retry_delay': timedelta(minutes=5), # 失败重试间隔。
}


with DAG('jupyter_airflow_example',
         max_active_runs=1,
         schedule_interval='0 0 * * *', # 执行周期,依次是分、时、周、月和年,此处表示每天0:00执行。
         default_args=default_args) as dag:

    execution_date = "{{ ds }}"

    op = JupyterNotebookEmrOperator(
        task_id='<yourTaskId>',
        vcores=<yourVcore>,
        memory='<yourMemory>',
        job_name='<yourJobname>',
        cluster_id='<yourClusterIP>',
        notebook_path='<yourNotebookpath>',
        username='<yourName>',
        venv='<yourVENV>',
        env={'VENV': '<yourENV>', 'SPARK_HOME': '/usr/lib/spark-current', 'HIVE_CONF_DIR': '/etc/ecm/hive-conf', 'PYSPARK_PYTHON': './env/bin/python'},
        params={'<yourparams>': execution_date},
    )
其中,涉及的参数信息如下。
参数 描述
<yourTaskId> Operator通用Task ID,在DAG文件内唯一,字符串类型。例如,jupyter_task。
<yourVcore> 指定运行容器的虚拟核数,整数类型。例如,1。
<yourMemory> 指定运行容器的内存,字符串类型。例如,4 G。
<yourJobname> YARN作业名,字符串类型。例如,papermill。
<yourClusterIP> 您关联的Hadoop集群的ID。
<yourName> 登录JupyterHub的用户名。
<yourNotebookpath> Notebook在OSS中的存储路径,只需填写相对路径即可。本文示例为spark_magic_example.ipynb
<yourVENV> 运行Jupyter时选择的虚拟环境,只需要填写虚拟环境名称即可,不带.tar.gz文件名后缀,字符串类型。例如,emr-jupyterenv
<yourENV> 指定容器环境变量,dict类型。
<yourparams> Notebook参数,需要在Notebook的单元格上添加parameters标签。

标签详情,请参见Parameterize

步骤二:上传Airflow DAG脚本

本文通过OSS控制台上传Airflow DAG脚本。上传Airflow DAG脚本的路径为您创建集群时指定的OSS路径。

  1. 登录 OSS管理控制台
  2. 上传Airflow DAG脚本,详情请参见上传文件
    上传您在步骤一:编写Airflow DAG脚本中编写的Airflow DAG脚本。

    上传Airflow DAG脚本的路径与您创建集群时设置的路径有关。例如,如果您创建集群时设置的路径为oss://bucket_1/your_folder,则您的Airflow DAG脚本存放的路径就是oss://bucket_1/your_folder/airflow/dags

步骤三:启用DAG脚本

DAGs页面,打开待启用DAG脚本所在行的off开关,即启用了调度任务。Jupyter_DAG
说明 上传Airflow DAG脚本至OSS后,在DAGs页面并不能立刻看到调度任务,大约需要1~2分钟,请您耐心等待。