迁移、同步或订阅对象说明

DTS支持调用API接口,配置或查询DTS任务的迁移、同步或订阅对象。本文将为您介绍相关API接口,并提供对象的定义和配置案例。

相关接口及参数

API

说明

在请求参数Dblist中配置DTS任务的迁移、同步或订阅的对象。

在返回参数DbObject中查询DTS任务的迁移、同步或订阅的对象。

迁移、同步或订阅对象定义说明

迁移、同步或订阅对象的数据类型为JSON,详细定义如下:

  • 若迁移、同步或订阅对象包含多个库,则您可参考如下定义:

    重要

    订阅实例不支持映射功能,即订阅实例中name的参数值必须与待订阅的数据库或表的名称一致。

    {
        "待迁移、同步或订阅的库1的名称": {
            "name": "迁移、同步或订阅的库1在目标实例中的名称",
            "all": true(表示迁移、同步或订阅对象为整库)
        },
        "待迁移、同步或订阅的库2的名称": {
            "name": "迁移、同步或订阅的库2在目标实例中的名称",
            "all": false(表示迁移、同步或订阅对象为非整库),
            "Table": {
                "待迁移、同步或订阅的表A的名称": {
                    "name": "迁移、同步或订阅的表A在目标实例中的名称",
                    "all": true(表示迁移、同步或订阅对象为整表),
                    "dml_op": "需增量迁移或同步的DML操作",
                    "ddl_op": "需增量迁移或同步的DDL操作"
                }
            }
        },
        "待迁移、同步或订阅的库3的名称": {
            "name": "迁移、同步或订阅的库3在目标实例中的名称",
            "all": true(表示迁移、同步或订阅对象为整库),
            "dml_op": "需增量迁移或同步的DML操作",
            "ddl_op": "需增量迁移或同步的DDL操作"
        }
    }
  • 若迁移或同步对象的粒度为列,或者包含过滤条件,则您可参考如下定义:

    {
        "待迁移、同步或订阅的库名": {
            "name": "迁移、同步或订阅的库在目标实例中的库名",
            "all": false(表示迁移、同步或订阅对象为非整库),
            "Table": {
                "待迁移、同步或订阅的表名A": {
                    "name": "迁移、同步或订阅的表在目标实例中的表名A",
                    "all": false(表示迁移、同步或订阅对象为非整表),
                    "filter": "id>10"
                    "column": {
                        "id": {
                            "key": "PRI",
                            "name": "id",
                            "type": "int(11)",
                            "sharedKey": false,
                            "state": "checked"
                        }
                    },
                    "shard": 12
                }
            }
        }
    }
  • 若迁移或同步对象所属的目标库实例为云原生数据仓库AnalyticDB MySQL版AnalyticDB PostgreSQL版时,则您可参考如下定义:

    {
        "待迁移、同步的库的名称": {
            "name": "迁移、同步的库在目标实例中的名称",
            "all": false(固定为false。无论是对象为整库还是表级,如目标实例为AnalyticDB MySQL或AnalyticDB PostgreSQL,固定传入false,且还需传入表对应的分区键等信息),
            "Table": {
                "待迁移、同步的表A的名称": {
                    "all": true(表示迁移、同步对象为整表),
                    "name": "迁移、同步的表A在目标实例中的名称",
                    "primary_key": "id(指定主键)",
                    "type": "dimension(表的类型)",
                }
                "待迁移、同步的表B的名称": {
                    "all": true(表示迁移、同步对象为整表),
                    "name": "迁移、同步的表B在目标实例中的名称",
                    "part_key": "id(指定分区键)",
                    "primary_key": "id(指定主键)",
                    "type": "partition(表的类型)",
                    "tagColumnValue": "标签列的值"
                }
            }
        }
    }
  • 若需要给同步对象设置独立冲突修复策略,则您可参考如下定义:

    说明
    • 当前仅MySQL间或PolarDB MySQL版集群间的双向同步实例支持。

    • 支持设置库级别或表级别的独立冲突修复策略。

    • 设置了独立冲突修复策略的列,全局冲突修复策略对其不生效。

    表级别

    {
        "待同步的库的名称1": {
          "name": "待同步的库1在目标实例中的名称",
          "all": true(表示同步对象为整库),
          "conflict": "任务级别的冲突修复策略"
        },
        "待同步的库的名称2": {
          "name": "待同步的库2在目标实例中的名称",
          "all": false(表示同步对象为非整库),
          "conflict": "overwrite",
          "Table": {
            "待同步的表A的名称": {
              "name": "同步的表A在目标实例中的名称",
              "all": true(表示同步对象为整表),
              "cdr_cmp_col": "冲突检测列",
              "cdr_rslv_col": "冲突检测列",
              "resolve_method": "表级别的冲突修复策略"
            }
          }
        }
    }

    库级别

    • 同步对象为整库:

      "待同步的库的名称1": {
        "name": "待同步的库1在目标实例中的名称",
        "all": true(表示同步对象为整库),
        "conflict": "任务级别的冲突修复策略",
        "cdr_cmp_col": "冲突检测列",
        "cdr_rslv_col": "冲突检测列",
        "resolve_method": "库级别的冲突修复策略"
        }
      }
    • 同步对象非整库:

      "待同步的库的名称2": {
        "name": "待同步的库2在目标实例中的名称",
        "all": false(表示同步对象为非整库),
        "conflict": "任务级别的冲突修复策略",
        "cdr_cmp_col": "冲突检测列",
        "cdr_rslv_col": "冲突检测列",
        "resolve_method": "库级别的冲突修复策略",
        "Table": {
          "待同步的表A的名称": {
            "name": "同步的表A在目标实例中的名称",
            "all": true(表示同步对象为整表)
          }
        }
      }

参数

说明

name

源库、表、列名映射到目标库、表、列的名称。如源库名为dtssource,目标库名为dtstarget,则您需在name中传入dtstarget,表示把源库名映射成dtstarget后写入目标库中。

all

是否选择全部表、列,取值:

  • true:是。

    说明

    如选择全部表或列,则无需在配置具体的表或列信息。

  • false:否。

Table

待迁移、同步或订阅的表信息。

filter

设置过滤条件,过滤待迁移、同步或订阅的数据。目前仅支持在表级别中体现。

例如您可传入id>10,选择只迁移、同步该表中ID列的值大于10的数据。更多过滤条件的格式,请参见设置过滤条件

说明

订阅任务暂不支持设置过滤条件。

column

待迁移、同步或订阅的列信息。

key

是否为主键,取值:

  • PRI:是。

  • 空值:否。

sharedKey

是否为分片键,取值:

  • true:是。

  • false:否。

说明

当迁移、同步对象所属数据库的类型为Kafka时,才需配置本参数。

type

该字段的数据类型。

state

如为checked,则表示该列被选中。

shard

待迁移、同步的表的分片数量。

说明

当迁移、同步数据所属数据库的类型为Kafka时,才需配置本参数。

dml_op

选择增量迁移或同步的DML操作,取值及意思如下:

  • i:INSERT。

  • u:UPDATE。

  • d:DELETE。

  • 如该值为空,则表示增量迁移或同步该任务所有支持的DML操作。

  • none:不增量迁移或同步DML操作。

说明

如需查询不同迁移或同步任务支持的DML操作,请参见迁移方案概览同步方案概览中具体任务的配置文档。

ddl_op

选择增量迁移或同步的DDL操作。取值及意思如下:

  • ct:CREATE TABLE。

  • at:ALTER TABLE。

  • dt:DROP TABLE。

  • rt:RENAME TABLE。

  • tt:TRUNCATE TABLE。

  • 如该值为空,则表示增量迁移或同步该任务所有支持的DDL操作。

  • none:不增量迁移或同步DDL操作。

说明

如需查询不同迁移或同步任务支持的DDL操作,请参见迁移方案概览同步方案概览中具体任务的配置文档。

primary_key

表示主键。当目标实例为云原生数据仓库AnalyticDB MySQL版AnalyticDB PostgreSQL版时,本参数才可用且必须传入。

part_key

表示分区键。当目标实例为云原生数据仓库AnalyticDB MySQL版AnalyticDB PostgreSQL版时,本参数才可用且必须传入。

type

重要

此处的type参数与代表字段数据类型的type参数不同。

当目标实例为云原生数据仓库AnalyticDB MySQL版AnalyticDB PostgreSQL版时,您需要指明迁移、同步对象的表类型:

  • dimension:维度表。

  • partition:分区表。

tagColumnValue

自定义__dts_data_source标签列的值。当目标实例为云原生数据仓库AnalyticDB MySQL版时,本参数才可用且必须传入。

conflict

任务级别的全局冲突修复策略,待同步的每个库中都需传入该参数,且取值必须一致。取值:

  • overwrite:当数据同步遇到冲突时,直接覆盖目标库中的冲突记录。

  • interrupt:当数据同步遇到冲突时,同步任务直接报错并退出,同步任务进入失败状态,需要您介入修复任务。

    说明

    控制台显示为TaskFailed

  • ignore:当数据同步遇到冲突时,直接跳过当前同步语句,继续往下执行,选择使用目标库中的冲突记录。

resolve_method

表级别的独立冲突修复策略(仅增量同步支持),取值:

  • overwrite:当数据同步遇到冲突时,直接覆盖目标库中的冲突记录。

  • interrupt:当数据同步遇到冲突时,同步任务直接报错并退出,同步任务进入失败状态,需要您介入修复任务。

    说明

    控制台显示为TaskFailed

  • ignore:当数据同步遇到冲突时,直接跳过当前同步语句,继续往下执行,选择使用目标库中的冲突记录。

  • use_max:当数据同步遇到冲突时,对当前冲突列的两条记录进行对比,将较大值的记录写入到目标库中。若目标记录不存在或字段类型不满足要求,则系统的处理方法等效于overwrite

  • use_min:当数据同步遇到冲突时,对当前冲突列的两条记录进行对比,将较小值的记录写入到目标库中。若目标记录不存在或字段类型不满足要求,则系统的处理方法等效于ignore

cdr_cmp_col

  • 表级别:表示除主键和唯一键外,需要设置独立冲突修复策略的冲突检测列,且取值必须一致。

    重要
    • 冲突策略为use_maxuse_min时,参数cdr_cmp_colcdr_rslv_col必须传入。

    • 取值默认已包含主键和唯一键,无需手动传入对应的列。

  • 库级别:表示需要设置独立冲突修复策略的冲突检测列,且取值必须一致。

    重要

    参数cdr_cmp_colcdr_rslv_col必须传入。

cdr_rslv_col

迁移、同步或订阅对象配置示例

  • 示例一:迁移、同步或订阅dtstestdata库中所有的表。

    {"dtstestdata": {   "name": "dtstestdata",   "all": true }}
  • 示例二:迁移或同步dtstestdata库,并修改库名为dtstestdata_new。

    {"dtstestdata": {   "name": "dtstestdata_new",   "all": true }}
  • 示例三:迁移、同步或订阅dtstestdata库中部分表(如customer)。

    {"dtstestdata": {
       "name": "dtstestdata",
       "all": false,
       "Table": {
         "customer": {
           "name": "customer",
           "all": true, 
           "column": { 
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"  
             },
             "gmt_create": {
               "key": "",
               "name": "gmt_create",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "gmt_modify": {
               "key": "",
               "name": "gmt_modify",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "valid_time": {
               "key": "",
               "name": "valid_time",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "creator": {
               "key": "",
               "name": "creator",
               "type": "varchar(200)",
               "sharedKey": false,
               "state": "checked"
             }
           },
           "shard": 12
         }
       }
     }
    }
  • 示例四:迁移或同步dtstestdata库中的表(如customer和order)的部分列。

    {"dtstestdata": {
       "name": "dtstestdata",
       "all": false,
       "Table": {
         "customer": {
           "name": "customer",
           "all": false, 
           "column": { 
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"  
             },
             "level": {
               "key": "",
               "name": "level",
               "type": "varchar(5000)",
               "sharedKey": false,
               "state": "checked"
             },
             "name": {
               "key": "",
               "name": "name",
               "type": "varchar(500)",
               "sharedKey": false,
               "state": "checked"
             },
           },
           "shard": 12
         },
         "order": {
           "name": "order",
           "all": false,
          "column": {
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"
             }
           },
           "shard": 12
         }
       }
     }
    }
  • 示例五:迁移或同步dtstestdata库中的表(如customer、order、commodity)至目标实例云原生数据仓库AnalyticDB MySQL版AnalyticDB PostgreSQL版中。

    {
        "dtstestdata": {
            "name": "dtstestdatanew",
            "all": false,
            "Table": {
                "order": {
                    "name": "ordernew",
                    "all": true,
                    "part_key": "id",
                    "primary_key": "id",
                    "type": "partition"
                },
                "customer": {
                    "name": "customernew",
                    "all": true,
                    "primary_key": "id",
                    "type": "dimension"
                },
                "commodity": {
                    "name": "commoditynew",
                    "all": false,
                    "filter": "id>10",
                    "column": {
                        "id": {
                            "key": "PRI",
                            "name": "id",
                            "type": "int(11)"
                        }
                    },
                    "part_key": "id",
                    "primary_key": "id",
                    "type": "partition"
                }
            }
        }
    }
  • 示例六:

    表级别

    给同步任务的对象设置全局冲突修复策略interrupt;给同步的dtstestdata2库中的customer表的主键列、唯一键列和name列设置独立冲突修复策略overwrite

    {
        "dtstestdata1": {
          "name": "dtstestdata1",
          "all": true,
          "conflict": "interrupt"
        },
        "dtstestdata2": {
          "name": "dtstestdata2",
          "all": false,
          "conflict": "interrupt",
          "Table": {
            "customer": {
              "name": "customer",
              "all": true,
              "cdr_cmp_col": "name",
              "cdr_rslv_col": "name",
              "resolve_method": "overwrite"
            }
          }
        }
      }

    库级别

    • 同步对象为整库:给同步的dtstestdata1库中所有待同步表的nameaddr列,设置独立冲突修复策略use_max

      "dtstestdata1": {
        "name": "dtstestdata1",
        "all": true,
        "conflict": "overwrite",
        "cdr_cmp_col": "name,addr",
        "cdr_rslv_col": "name,addr",
        "resolve_method": "use_max"
        }
      }
    • 同步对象非整库:给同步的dtstestdata2库中所有待同步表的nameaddr列,设置独立冲突修复策略use_max

      "dtstestdata2": {
        "name": "dtstestdata2",
        "all": false,
        "conflict": "overwrite",
        "cdr_cmp_col": "name,addr",
        "cdr_rslv_col": "name,addr",
        "resolve_method": "use_max",
        "Table": {
          "person": {
            "name": "person",
            "all": true
          },
          "class": {
            "name": "class",
            "all": true
          }
        }
      }

  • 示例七:源数据库类型为Tair/Redis的同步实例,对名称为0和1的DB,仅同步Key前缀为HProp的数据(即需要同步的前缀HProp);对名称为2的DB仅同步Key前缀为dts且不包含dtstest的数据(即需要同步的前缀dts,且需要过滤的前缀dtstest)。

    {
        "0": {
            "name": "0", 
            "all": true,
             "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]"
        }, 
        "1": {
            "name": "1", 
            "all": true,
             "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]"
        }, 
        "2": {
            "name": "2", 
            "all": true,
             "filter": "[{"condition":"dts","filterType":"white","filterPattern":"prefix"},{"condition":"dtstest","filterType":"black","filterPattern":"prefix"}]"
        }
    }