DTSLakeInjectionOperator

本文为您介绍DTSLakeInjectionOperator操作的配置信息。

功能说明

借助DTS的能力,将DMS管理的数据库中的数据同步到对象存储OSS上。

参数说明

说明

参数bucket_namedb_listreserve可以使用Jinja模板

参数

类型

是否必填

说明

source_instance

string

DBLink的名称,详情请参见逻辑数仓

source_database

string

源库名称。

target_instance

string

目标DBLink的名称。

bucket_name

string

OSSBucket名称。

db_list

dict

需要同步的对象,详情请参见同步对象说明

reserve

dict

说明

部分数据库必填。

任务的预留参数,详情请参见Reserve参数说明

polling_interval

int

刷新执行结果的间隔时间。单位为秒,默认值为10。

示例

说明

task_iddagAirflow的特定参数,详情请参见Airflow官方文档

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator

with DAG(
    "dms_dts_dblink",
    params={
    },
) as dag:

    dts_operator = DTSLakeInjectionOperator(
        task_id="dts_test_dblink",
        source_instance='dblink_90',
        source_database='student_db',
        target_instance='dbl_oss_63',
        bucket_name='hansheng-bj',
        db_list=json.loads("""
        {\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
        """),
        reserve=json.loads("""
        {\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
        """),
        polling_interval=5,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    dts_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )