本文为您介绍DMSNotebookOperator操作的配置信息。
功能说明
执行一个DMS管理的Notebook文件(.pynb)。
前提条件
已创建工作空间。
已创建资源组。
若需要复用会话时,还需确保已创建Notebook会话。
若无需复用会话(创建新的Notebook会话)时,还需确保已创建模板(Notebook会话的配置信息)。
参数说明
参数file_path
、run_params
、profile
、session_id
、profile_id
、cluster_id
、session_name
、profile_name
、cluster_name
可以使用Jinja模板。
参数 | 类型 | 是否必填 | 说明 |
file_path | string | 是 | Notebook文件(.pynb)的路径。 |
profile | dict | 否 | Notebook会话的配置信息。
|
profile_id | string | 否 说明 不复用会话时必填。 |
|
profile_name | string | ||
cluster_type | string | AnalyticDB MySQL集群的类型,取值:
| |
cluster_id | string |
说明 二选一, | |
cluster_name | string | ||
spec | string | Driver的资源规格,取值:
| |
runtime_name | string | 镜像名称。 | |
session_id | string | 否 说明 复用会话时必填。 | 复用的会话信息。
说明 二选一, |
session_name | string | ||
run_params | dict | 否 | 运行参数,可以替换Notebook文件中的变量。 |
timeout | int | 否 | Notebook文件的执行时长(超时时间),单位为秒。 |
polling_interval | int | 否 | 刷新执行结果的间隔时间。单位为秒,默认值为10。 |
示例
task_id
和dag
是Airflow的特定参数,详情请参见Airflow官方文档。
from doctest import debug
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
import json
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
with DAG(
"dms_notebook_test",
params={
"x":3
},
) as dag:
notebook_operator = DMSNotebookOperator(
task_id='notebook_test_hz_name',
profile_name='hansheng_profile.48',
profile={},
cluster_type='spark',
cluster_name='spark_general2.218',
spec='4C32G',
runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9',
file_path='/Workspace/code/default/test.ipynb',
run_params={
'a':"{{ params.x }}"
},
polling_interval=5,
debug=True,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
notebook_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)