本文为您介绍DMSNotebookOperator操作的配置信息。
功能说明
执行一个DMS管理的Notebook文件(.pynb)。
前提条件
已创建工作空间。
已创建资源组。
若需要复用会话时,还需确保已创建Notebook会话。
若无需复用会话(创建新的Notebook会话)时,还需确保已创建模板(Notebook会话的配置信息)。
参数说明
参数file_path
、run_params
、profile
、session_id
、profile_id
、cluster_id
、session_name
、profile_name
、cluster_name
可以使用Jinja模板。
参数 | 类型 | 是否必填 | 说明 |
file_path | string | 是 | Notebook文件(.pynb)的路径。 |
profile | dict | 否 | Notebook会话的配置信息。
|
profile_id | string | 否 说明 不复用会话时必填。 |
|
profile_name | string | ||
cluster_type | string | DMS工作空间的计算集群的类型,取值:
| |
cluster_id | string |
说明 二选一, | |
cluster_name | string | ||
spec | string | Driver的资源规格,取值:
| |
runtime_name | string | 镜像名称。 | |
session_id | string | 否 说明 复用会话时必填。 | 复用的会话信息。
说明 二选一, |
session_name | string | ||
run_params | dict | 否 | 运行参数,可以替换Notebook文件中的变量。 |
timeout | int | 否 | Notebook文件的执行时长(超时时间),单位为秒。 |
polling_interval | int | 否 | 刷新执行结果的间隔时间。单位为秒,默认值为10。 |
示例
task_id
和dag
是Airflow的特定参数,详情请参见Airflow官方文档。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
import json
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
with DAG(
"dms_notebook_test",
params={
"x":3
},
) as dag:
notebook_operator = DMSNotebookOperator(
task_id='notebook_test_hz_name',
profile_name='hansheng_profile.48',
profile={},
cluster_type='spark',
cluster_name='spark_general2.218',
spec='4C32G',
runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9',
file_path='/Workspace/code/default/test.ipynb',
run_params={
'a':"{{ params.x }}"
},
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
notebook_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)