使用Airflow调度Notebook

本文为您介绍如何通过Airflow调度Notebook文件,查看Notebook运行进度。

前提条件

使用Airflow调度Notebook

  1. 登录数据管理DMS 5.0
  2. 进入工作空间页面。

    DMS提供了两种进入工作空间的路径,您可根据需求选择。

    路径一

    单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > Data+AI > Dify

    说明

    若您使用的是非极简模式的控制台,请在顶部菜单栏中,选择Data+AI > Dify

    image

    路径二

    单击页面左侧的数智工厂image图标,再单击工作空间

    说明

    若您使用的是非极简模式的控制台,请在顶部菜单栏中,选择数智工厂 > 工作空间

    image

  3. image(资源管理器)页面的WORKSPACE区域,单击image,选择新建Notebook文件

    image

  4. Notebook文件中配置任意代码,例如print(1)

  5. REPOS(代码仓库)区域编写Python代码,配置调度Notebook所需的参数。代码示例如下:

    from doctest import debug
    from airflow import DAG
    from airflow.decorators import task
    from airflow.models.param import Param
    from airflow.operators.bash import BashOperator
    from airflow.operators.empty import EmptyOperator
    from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
    
    with DAG(
        "dms_notebook_sy_hz_name",
        params={
        },
    ) as dag:
    
        notebook_operator = DMSNotebookOperator(
            task_id='dms_notebook_sy_hz_name',
            profile_name='test',
            profile={},
            cluster_type='spark',
            cluster_name='spark_cluster_855298',
            spec='4C16G',
            runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9',
            file_path='/Workspace/code/default/test.ipynb',
            run_params={'a':10},    
            polling_interval=5,
            debug=True,
            dag=dag
        )
    
        run_this_last = EmptyOperator(
            task_id="run_this_last22",
            dag=dag,
        )
    
        notebook_operator >> run_this_last
    
    if __name__ == "__main__":
        dag.test(
            run_conf={}
        )

    代码中的部分参数说明如下,未说明的参数保持默认值即可。

    参数

    类型

    是否必填

    说明

    task_id

    string

    您自行定义的任务唯一标识。

    profile_name

    string

    profile名称。

    您可以单击右侧边框的image(配置管理),配置新的profile。

    cluster_type

    string

    Notebook会话实例中配置的集群类型。

    目前支持两种集群类型:CPU集群和Spark集群。CPU集群是DMS默认创建,而Spark集群需要您手动创建。具体操作,请参见创建Spark集群

    cluster_name

    string

    集群名称。

    spec

    string

    集群规格。

    当前仅支持填写默认规格4C16G。

    runtime_name

    string

    运行环境。

    当前Spark的运行环境仅支持选择Spark3.5_Scala2.12_Python3.9_General:1.0.9Spark3.3_Scala2.12_Python3.9_General:1.0.9

    file_path

    string

    文件路径。

    查看文件路径。路径格式为/Workspace/code/default。示例:/Workspace/code/default/test.ipynb

    run_params

    dict

    运行参数,可以替换Notebook文件中的变量。

    timeout

    int

    Notebook Cell的最大执行时长。单位为秒。

    当某个Cell的执行时长超过timeout时,整个文件会停止调度。

    polling_interval

    int

    刷新执行结果的间隔时间。单位为秒,默认值为10。

  6. REPO绑定发布环境

    单击代码仓库名称,在配置环境页面将环境与工作流实例进行绑定,便于后面发布。一个工作流实例只能与一个类型的环境绑定。

  7. 将鼠标悬浮至目标仓库名称,单击image,再选择发布本次发布环境(仓库绑定的环境),单击确定即可。

    说明

    发布操作存在10秒延迟。

    image

  8. 执行Notebook。

    1. 单击工作空间左侧的image工作流图标,再单击目标Airflow实例名称,即可进入Airflow空间。

    2. Code页面下查看发布的代码是否已同步到当前页面。

    3. 确认同步至当前页面后,单击右上角的image运行按钮。

    4. 单击Graph页签,找到并单击对应的任务。

    5. 单击Logs页签,即可查看任务的运行日志。

      任务运行时,您可随时查看Notebook的运行进度

查看Notebook运行进度

  • DAG页面查看运行进度

    Logs页签下查看当前任务的运行进度,例如progress2/15,表示总共15Cell,正在执行第二个Cell。

    image

    说明

    当出现notebook run success时,表示任务已执行完成。

  • Notebook文件页面查看运行进度

    Logs页面,单击日志中的Notebook页面url后的链接,进入Notebook文件中查看执行状态。支持单击右上角的刷新按钮,实时刷新执行进展。

    当出现执行结果时,表示Notebook任务已执行完成。

    image