本文为您介绍DMSSqlOperator操作的配置信息。
功能说明
将SQL提交到DMS管理的数据库实例中执行,并获取结果。
参数说明
参数instance
、database
、sql
可以使用Jinja模板。
参数 | 类型 | 是否必填 | 说明 |
instance | string | 是 | DMS管理的数据库实例的连接(DBLink),详情请参见逻辑数仓。 |
database | string | 是 | 数据库名称。 |
sql | string | 是 | 需要执行的SQL语句。 说明 多条SQL语句间需以英文分号(;)分隔。 |
csv_null_replace_str | string | 否 | 用于替换 |
callback | function | 否 | 用于处理SQL操作结果的回调函数,入参为PollAsyncSQLExecuteResult。 说明 当 |
polling_interval | int | 否 | 刷新执行结果的间隔时间。单位为秒,默认值为10。 |
PollAsyncSQLExecuteResult
参数 | 类型 | 说明 |
Status | string | SQL操作的执行状态,取值:
|
SQLType | string | SQL操作的类型,取值:
|
ResultType | string | SQL操作执行结果的类型。 说明 当取值为空时,表示SQL操作尚未完成。
|
ResultContent | JSON | SQL操作执行结果的具体信息。 说明 在使用
|
示例
task_id
和dag
是Airflow的特定参数,详情请参见Airflow官方文档。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
import json
import requests
def callback(result):
print(f"result: {result}")
if 'Data' in result and 'ResultContent' in result['Data']:
link = result['Data']['ResultContent']['ResultSetFileLink']
print(f"get link: {link}")
http_res = requests.get(link, headers={
"x-oss-range-behavior": "standard"
})
print(f"link res: {http_res.text}")
with DAG(
"dms_sql_dblink",
params={
},
) as dag:
sql_operator = DMSSqlOperator(
task_id="sql_test_dblink",
instance="dblink_90",
database="student_db",
sql="show databases;show tables;",
csv_null_replace_str="null",
callback=callback,
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
sql_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)