本文为您介绍DMSAnalyticDBSparkSqlOperator操作的配置信息。
功能说明
将Spark SQL提交到湖仓版AnalyticDB MySQL管理的特定资源组上(任务类型为Interactive、引擎为Spark),详情请参见应用场景。
参数说明
参数sql
、conf
可以使用Jinja模板。
参数 | 类型 | 是否必填 | 说明 |
cluster_id | string | 是 |
说明 二选一,建议使用 |
instance | string | ||
resource_group | string | 是 | AnalyticDB MySQL集群的资源组名称。 |
sql | string | 是 | Spark的SQL操作,详情请参见Spark文档。 |
conf | dict | 否 | Spark的特殊配置,详情请参见ExecuteSparkWarehouseBatchSQL、GetSparkWarehouseBatchSQL、CancelSparkWarehouseBatchSQL。 说明
|
schema | string | 否 | 数据库,默认为default。 |
polling_interval | int | 否 | 刷新执行结果的间隔时间。单位为秒,默认值为10。 |
execute_time_limit_in_seconds | int | 否 | 超时时间,单位为秒,默认值为36000(10小时)。 |
callback | function | 否 | 用于处理SQL操作结果的回调函数,入参为SparkBatchSQL。 说明 当 |
示例
task_id
和dag
是Airflow的特定参数,详情请参见Airflow官方文档。
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator
# alibabacloud_adb20211201.models.SparkBatchSQL
def print_result(result):
print(f"{result}")
with DAG(
"dms_adb_spark_dblink",
params={
"sql": "show databases;show databases;"
}
) as dag:
warehouse_operator: Any = DMSAnalyticDBSparkSqlOperator(
task_id="warehouse_sql",
instance="dbl_adbmysql_89",
resource_group="hansheng_spark_test",
sql="{{ params.sql }}",
polling_interval=5,
conf={
'spark.adb.sqlOutputFormat':'CSV',
'spark.adb.sqlOutputPartitions':1,
'spark.adb.sqlOutputLocation':'oss://hansheng-bj/airflow/adb_spark/test',
'sep':'|'
},
callback=print_result,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag
)
warehouse_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)