DMSNotebookOperator

本文为您介绍DMSNotebookOperator操作的配置信息。

功能说明

执行一个DMS管理的Notebook文件(.pynb)。

前提条件

参数说明

说明

参数file_pathrun_paramsprofilesession_idprofile_idcluster_idsession_nameprofile_namecluster_name可以使用Jinja模板

参数

类型

是否必填

说明

file_path

string

Notebook文件(.pynb)的路径。

profile

dict

Notebook会话的配置信息。

  • autoStopTime:资源释放时间。

  • mountPoints:数据存储位置。

    格式为[{"mntPath" : "/mnt/data***","dataPath" : "oss://test/***"},......]mntPath表示挂载路径,dataPath表示OSS路径。

  • dependencies:Pypi包管理。

  • environments:环境变量。

profile_id

string

说明

不复用会话时必填。

  • profile_id:配置的ID。

  • profile_name:配置的名字。

    说明

    二选一,profile_id的优先级高。

profile_name

string

cluster_type

string

AnalyticDB MySQL集群的类型,取值:

  • cpu

  • spark

cluster_id

string

  • cluster_id:AnalyticDB MySQL集群的ID。

  • cluster_name:AnalyticDB MySQL集群的名称。

说明

二选一,cluster_id的优先级高。

cluster_name

string

spec

string

Driver的资源规格,取值:

  • 1C4G:14 GB

  • 2C8G:28 GB

  • 4C16G:416 GB

  • 8C32G:832 GB

  • 16C64G:1664 GB

runtime_name

string

镜像名称。

session_id

string

说明

复用会话时必填。

复用的会话信息。

  • session_id:会话的ID

  • session_name:会话的名称。

说明

二选一,session_id的优先级高。

session_name

string

run_params

dict

运行参数,可以替换Notebook文件中的变量。

timeout

int

Notebook文件的执行时长(超时时间),单位为秒。

polling_interval

int

刷新执行结果的间隔时间。单位为秒,默认值为10。

示例

说明

task_iddagAirflow的特定参数,详情请参见Airflow官方文档

from doctest import debug
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

import json
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator


with DAG(
    "dms_notebook_test",
    params={
        "x":3
    },
) as dag:

    notebook_operator = DMSNotebookOperator(
        task_id='notebook_test_hz_name',
        profile_name='hansheng_profile.48',
        profile={},
        cluster_type='spark',
        cluster_name='spark_general2.218',
        spec='4C32G',
        runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9',
        file_path='/Workspace/code/default/test.ipynb',
        run_params={
            'a':"{{ params.x }}"
        },
        polling_interval=5,
        debug=True,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    notebook_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )