DMSAnalyticDBSparkSqlOperator

本文为您介绍DMSAnalyticDBSparkSqlOperator操作的配置信息。

功能说明

Spark SQL提交到湖仓版AnalyticDB MySQL管理的特定资源组上(任务类型为Interactive、引擎为Spark),详情请参见应用场景

参数说明

说明

参数sqlconf可以使用Jinja模板

参数

类型

是否必填

说明

cluster_id

string

  • cluster_id:AnalyticDB MySQL集群的ID。

  • instance:DMS管理的数据库实例的连接(DBLink),详情请参见逻辑数仓

说明

二选一,建议使用instance

instance

string

resource_group

string

AnalyticDB MySQL集群的资源组名称。

sql

string

SparkSQL操作,详情请参见Spark文档

conf

dict

Spark的特殊配置,详情请参见ExecuteSparkWarehouseBatchSQLGetSparkWarehouseBatchSQLCancelSparkWarehouseBatchSQL

说明

conf参数即为ExecuteSparkWarehouseBatchSQL中的RuntimeConfig

schema

string

数据库,默认为default。

polling_interval

int

刷新执行结果的间隔时间。单位为秒,默认值为10。

execute_time_limit_in_seconds

int

超时时间,单位为秒,默认值为36000(10小时)。

callback

function

用于处理SQL操作结果的回调函数,入参为SparkBatchSQL

说明

polling_interval的值大于0时,本参数才生效。

示例

说明

task_iddagAirflow的特定参数,详情请参见Airflow官方文档

from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from datetime import datetime

from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator

# alibabacloud_adb20211201.models.SparkBatchSQL
def print_result(result):
    print(f"{result}")

with DAG(
    "dms_adb_spark_dblink",
    params={
        "sql": "show databases;show databases;"
    }
) as dag:

    warehouse_operator: Any = DMSAnalyticDBSparkSqlOperator(
        task_id="warehouse_sql",
        instance="dbl_adbmysql_89",
        resource_group="hansheng_spark_test",
        sql="{{ params.sql }}",
        polling_interval=5,
        conf={
            'spark.adb.sqlOutputFormat':'CSV',
            'spark.adb.sqlOutputPartitions':1,
            'spark.adb.sqlOutputLocation':'oss://hansheng-bj/airflow/adb_spark/test',
            'sep':'|'
        },
        callback=print_result,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag
    )

    warehouse_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )