本文为您介绍DTSLakeInjectionOperator操作的配置信息。
功能说明
借助DTS的能力,将DMS管理的数据库中的数据同步到对象存储OSS上。
参数说明
说明
参数bucket_name
、db_list
、reserve
可以使用Jinja模板。
参数 | 类型 | 是否必填 | 说明 |
source_instance | string | 是 | 源DBLink的名称,详情请参见逻辑数仓。 |
source_database | string | 是 | 源库名称。 |
target_instance | string | 是 | 目标DBLink的名称。 |
bucket_name | string | 是 | OSS的Bucket名称。 |
db_list | dict | 是 | 需要同步的对象,详情请参见同步对象说明。 |
reserve | dict | 否 说明 部分数据库必填。 | 任务的预留参数,详情请参见Reserve参数说明。 |
polling_interval | int | 否 | 刷新执行结果的间隔时间。单位为秒,默认值为10。 |
示例
说明
task_id
和dag
是Airflow的特定参数,详情请参见Airflow官方文档。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
with DAG(
"dms_dts_dblink",
params={
},
) as dag:
dts_operator = DTSLakeInjectionOperator(
task_id="dts_test_dblink",
source_instance='dblink_90',
source_database='student_db',
target_instance='dbl_oss_63',
bucket_name='hansheng-bj',
db_list=json.loads("""
{\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
"""),
reserve=json.loads("""
{\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
"""),
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
dts_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)
该文章对您有帮助吗?