DMSSqlOperator

本文为您介绍DMSSqlOperator操作的配置信息。

功能说明

SQL提交到DMS管理的数据库实例中执行,并获取结果。

参数说明

说明

参数instancedatabasesql可以使用Jinja模板

参数

类型

是否必填

说明

instance

string

DMS管理的数据库实例的连接(DBLink),详情请参见逻辑数仓

database

string

数据库名称。

sql

string

需要执行的SQL语句。

说明

多条SQL语句间需以英文分号(;)分隔。

csv_null_replace_str

string

用于替换resultset中的null值,默认为"null"字符串。

callback

function

用于处理SQL操作结果的回调函数,入参为PollAsyncSQLExecuteResult

说明

polling_interval的值大于0时,本参数才生效。

polling_interval

int

刷新执行结果的间隔时间。单位为秒,默认值为10。

PollAsyncSQLExecuteResult

参数

类型

说明

Status

string

SQL操作的执行状态,取值:

  • WAITING

  • RUNNING

  • SUCCESS

  • FAILURE

SQLType

string

SQL操作的类型,取值:

  • DDL

  • DML

  • DQL

  • UNKNOWN

    说明

    无法解析。

ResultType

string

SQL操作执行结果的类型。

说明

当取值为空时,表示SQL操作尚未完成。

  • PLAINTEXT:使用ResultContent的内容即可,一般是DDLDML信息。

  • FILE:需要在OSS下载,默认格式为CSV,一般是DQL信息。

ResultContent

JSON

SQL操作执行结果的具体信息。

说明

在使用resultSetFileLink的链接下载文件时,必须指定请求头"x-oss-range-behavior:standard",否则将导致签名校验失败。

{
  "resultSetFileLink" : "https://xxxx", // 当SQLTypeDQL时不为空
  "resultSetFileType" : "CSV", // 当SQLTypeDQL时为CSV,其余时间为空
  "resultSetOption" : {
    ""
  }, 
  "columnMetas": [
    {
      
    }
      
  ],
  "count" : 1,  // 当DQL时不为空,表示结果集条数
  "affectRows" : 1, // 当DML、DQL等时不为空,表示SQL影响行数,例如更新了几行等
  "errorMessage" : "xxxx" //当StatusFAIL的时候非空
}

示例

说明

task_iddagAirflow的特定参数,详情请参见Airflow官方文档

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator

import json
import requests

def callback(result):
    print(f"result: {result}")
    if 'Data' in result and 'ResultContent' in result['Data']:
        link = result['Data']['ResultContent']['ResultSetFileLink']
        print(f"get link: {link}")
        http_res = requests.get(link, headers={
            "x-oss-range-behavior": "standard"
        })
        print(f"link res: {http_res.text}")


with DAG(
    "dms_sql_dblink",
    params={
    },
) as dag:

    sql_operator = DMSSqlOperator(
        task_id="sql_test_dblink",
        instance="dblink_90",
        database="student_db",
        sql="show databases;show tables;",
        csv_null_replace_str="null",
        callback=callback,
        polling_interval=5,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    sql_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )