CreateDIJob - 创建数据集成同步任务

更新时间:
复制为 MD 格式

创建数据集成新版任务。

接口说明

  • 需要购买 DataWorks 基础版及以上版本才能使用。

  • 该接口创建了数据集成同步任务, 参数包括同步任务的源端配置 SourceDataSourceSettings 和目的端配置 DestinationDataSourceSettings,以及支持的同步类型 MigrationType,同时通过 TransformationRules 定义了对同步表做的如加列,表名替换等映射 rule,在 TableMappings 中定义了要同步的具体表以及表需要使用的映射 rule,在 JobSettings 中定义了任务的列映射和周期设置等。

调试

您可以在OpenAPI Explorer中直接运行该接口,免去您计算签名的困扰。运行成功后,OpenAPI Explorer可以自动生成SDK代码示例。

调试

授权信息

当前API暂无授权信息透出。

请求参数

名称

类型

必填

描述

示例值

DestinationDataSourceType

string

目标端数据源类型,枚举值:Hologres,OSS-HDFS,OSS,MaxCompute,LogHub,StarRocks,DataHub,AnalyticDB_For_MySQL,Kafka,Hive。

Hologres

Description

string

任务的描述

DI Job Demo

SourceDataSourceType

string

源端数据源类型,枚举值: PolarDB,MySQL,Kafka,LogHub,Hologres,Oracle,OceanBase,MongoDB,RedShift,Hive,SQLServer,Doris,ClickHouse。

MySQL

ProjectId

integer

DataWorks 工作空间的 ID。您可以登录 DataWorks 控制台,进入工作空间管理页面获取 ID。

该参数用来确定本次 API 调用操作使用的 DataWorks 工作空间。

10000

Name

string

任务名称。

mysql_to_holo_sync_8772

MigrationType

string

同步类型,可选的枚举值有:

  • FullAndRealtimeIncremental(全量和实时增量,整库实时)

  • RealtimeIncremental(实时增量,单表实时)

  • Full(全量,整库离线)

  • OfflineIncremental(离线增量,整库离线)

  • FullAndOfflineIncremental(全量+离线增量,整库离线)

FullAndRealtimeIncremental

JobType

string

任务类型,可选

  • DatabaseRealtimeMigration(整库实时):将源端多个库的多个表进行流同步,支持仅全量,仅增量,或全量+增量。

  • DatabaseOfflineMigration(整库离线):将源端多个库的多个表进行批同步,支持仅全量,仅增量,或全量+增量。

  • SingleTableRealtimeMigration(单表实时):将源端单个表进行流同步

DatabaseRealtimeMigration

SourceDataSourceSettings

array<object>

源端数据源设置列表

array<object>

单个源端数据源设置

DataSourceName

string

数据源名称

mysql_datasource_1

DataSourceProperties

object

数据源属性

Encoding

string

数据库编码

UTF-8

Timezone

string

时区

Asia/Shanghai

ConnectionProperties

string

与 DataSourceName 数据源名称二选一填写;本参数为自定义数据源连接配置信息,包括实例 id、访问身份、实例 region 等。

本参数仅支持数据源配置为实例模式(ConnectionPropertiesMode),不同数据源具有不同的属性信息规范,请参考数据源连接信息 ConnectionProperties

{ "instanceId": "rm-2ze09gn3x6xxx", "password": "xxxx", "database": "agent", "username": "zmtest" "regionId": "cn-beijing" }

DestinationDataSourceSettings

array<object>

目标端数据源设置列表

array<object>

单个目标端数据源设置

DataSourceName

string

数据源名称

holo_datasource_1

DataSourceProperties

object

数据源属性

ConnectionProperties

string

与 DataSourceName 数据源名称二选一填写;本参数为自定义数据源连接配置信息,包括实例 id、访问身份、实例 region 等。

本参数仅支持数据源配置为实例模式(ConnectionPropertiesMode),不同数据源具有不同的属性信息规范,请参考数据源连接信息 ConnectionProperties

{ "instanceId": "rm-2ze09gn3x6xxx", "password": "xxxx", "database": "agent", "username": "zmtest" "regionId": "cn-beijing" }

ResourceSettings

object

资源设置

OfflineResourceSettings

object

离线同步资源

RequestedCu

number

离线同步使用的数据集成资源组 cu

2

ResourceGroupIdentifier

string

离线同步使用的数据集成资源组名称

S_res_group_111_222

RealtimeResourceSettings

object

实时同步资源

RequestedCu

number

实时同步使用的数据集成资源组 cu

2

ResourceGroupIdentifier

string

实时同步使用的数据集成资源组名称

S_res_group_111_222

ScheduleResourceSettings

object

调度资源

RequestedCu

number

离线同步任务使用的调度资源组 cu

2

ResourceGroupIdentifier

string

离线同步任务使用的调度资源组名称

S_res_group_222_333

TransformationRules

array<object>

同步对象转换规则定义列表

说明

[ { "RuleName":"my_database_rename_rule", "RuleActionType":"Rename", "RuleTargetType":"Schema", "RuleExpression":"{"expression":"${srcDatasoureName}_${srcDatabaseName}"}" } ]

object

单个同步对象转换规则定义,每个元素为一条转换规则定义

RuleActionType

string

动作类型,可选的枚举值:

  • DefinePrimaryKey(定义主键)

  • Rename(重命名)

  • AddColumn(增加列)

  • HandleDml(DML 处理)

  • DefineIncrementalCondition(定义增量条件)

  • DefineCycleScheduleSettings(定义周期调度设置)

  • DefinePartitionKey(定义分区列)

Rename

RuleExpression

string

规则表达式,json string 格式。

  1. 重命名规则(Rename)

  • 示例:{"expression":"${srcDatasourceName}_${srcDatabaseName}_0922" }

  • expression:为重命名转换规则表达式,表达式内支持变量包括:${srcDatasourceName}(源端数据源名)、${srcDatabaseName}(源端库名)、${srcTableName}(源端表名)。

  1. 加列规则(AddColumn)

  • 示例:{"columns":[{"columnName":"my_add_column","columnValueType":"Constant","columnValue":"123"}]}

  • 如不指定,默认规则为不加列不复制。

  • columnName:附加的列名称。

  • columnValueType:附加的列取值类型,包括 Constant(常量)、Variable(变量)。

  • columnValue:附加的列取值。当 columnValueType=Constant 时,value 为自定义常量,String 类型。当 columnValueType=Variable 时,value 为内置变量。内置变量可选值包括:EXECUTE_TIME(执行时间,Long 类型)、DB_NAME_SRC(源端数据库名称,String 类型)、DATASOURCE_NAME_SRC(源端数据源名称,String 类型)、TABLE_NAME_SRC(源端表名,String 类型)、DB_NAME_DEST(目标端数据库名称,String 类型)、DATASOURCE_NAME_DEST(目标端数据源名称,String 类型)、TABLE_NAME_DEST(目标端表名,String 类型)、DB_NAME_SRC_TRANSED(转换后的数据库名称,String 类型)。

  1. 指定目标端表的主键列列名(DefinePrimaryKey)

  • 示例:{"columns":["ukcolumn1","ukcolumn2"]}

  • 如不指定默认使用源端主键列。

  • 当目标端为已有表:数据集成系统不会修改目标端表结构,如果指定的主键列不在目标端的列集合中,任务启动会报错提示。

  • 当目标端为自动建表:数据集成系统会自动创建目标端表结构,表结构包含定义的主键列。当指定的主键列不在目标端的列集合中时,任务启动会报错提示。

  1. DML 处理规则(HandleDml)

  • 示例: {"dmlPolicies":[{"dmlType":"Delete","dmlAction":"Filter","filterCondition":"id > 1"}]}

  • 如不指定,默认规则为 Insert、Update、Delete 均为 Normal

  • dmlType:DML 操作类型,Insert(插入)、Update(更新)、Delete(删除)

  • dmlAction:DML 处理策略,Normal(正常处理)、Ignore(忽略)、Filter(有条件的正常处理,当 dmlType=Update/Delete 时使用)、LogicalDelete(逻辑删除)

  • filterCondition:DML 过滤条件,当 dmlAction=Filter 时使用

  1. 增量条件(DefineIncrementalCondition)

  • 示例: {"where":"id > 0"}

  • 指定增量过滤条件

  1. 周期调度参数(DefineCycleScheduleSettings)

  • 示例: {"cronExpress":" * * * * * *", "cycleType":"1"}

  • 指定周期任务调度参数

  1. 指定分区键(DefinePartitionKey)

  • 示例: {"columns":["id"]}

  • 指定分区键

{ "expression": "${srcDatasoureName}_${srcDatabaseName}" }

RuleName

string

规则名称,当动作类型和动作作用的目标类型相同时,规则名称需保证唯一性,长度不能超过 50 个字符

rename_rule_1

RuleTargetType

string

动作作用的目标类型,可选的枚举值:

  • Table(表)

  • Schema(schema)

  • Database(数据库)

Table

TableMappings

array<object>

同步对象转换映射列表,列表中每个元素描述了一组源端同步对象选择规则列表和在该组同步对象上应用的同步对象转换规则列表

说明

[ { "SourceObjectSelectionRules":[ { "ObjectType":"Database", "Action":"Include", "ExpressionType":"Exact", "Expression":"biz_db" }, { "ObjectType":"Schema", "Action":"Include", "ExpressionType":"Exact", "Expression":"s1" }, { "ObjectType":"Table", "Action":"Include", "ExpressionType":"Exact", "Expression":"table1" } ], "TransformationRuleNames":[ { "RuleName":"my_database_rename_rule", "RuleActionType":"Rename", "RuleTargetType":"Schema" } ] } ]

array<object>

每条规则代表选择需要同步的一个表

SourceObjectSelectionRules

array<object>

每条规则可选择待同步源端对象的集合,多条规则组成选一个表

object

每条规则可选择待同步源端对象的不同对象类型,如选择源端数据库、源端数据表

Action

string

选择动作,取值范围:Include/Exclude

Include

Expression

string

表达式

mysql_table_1

ExpressionType

string

表达式类型,取值范围:Exact/Regex

Exact

ObjectType

string

对象类型,可选的枚举值:

  • Table(表)

  • Schema(schema)

  • Database(数据库)

Table

TransformationRules

array<object>

同步对象转换规则定义列表,列表中每个元素为一条转换规则定义

object

源端对象上应用的转换规则

RuleName

string

规则名称,在一种动作类型+动作作用的目标类型下规则名称唯一,长度不能超过 50 个字符

rename_rule_1

RuleActionType

string

动作类型,可选的枚举值:

  • DefinePrimaryKey(定义主键)

  • Rename(重命名)

  • AddColumn(增加列)

  • HandleDml(DML 处理)

  • DefineIncrementalCondition(定义增量条件)

  • DefineCycleScheduleSettings(定义周期调度设置)

  • DefinePartitionKey(定义分区列)

Rename

RuleTargetType

string

动作作用的目标类型,可选的枚举值:

  • Table(表)

  • Schema(schema)

  • Database(数据库)

Table

JobSettings

object

同步任务维度的设置,含 DDL 处理策略、源端和目标端列数据类型映射策略、任务运行时参数等

ChannelSettings

string

通道相关任务设置,可以对一些特定通道进行特殊配置,目前支持 Holo2Holo(从 holo 同步到 holo),Holo2Kafka(从 Holo 同步到 Kafka)

  1. Holo2Kafka

  • 示例:{"destinationChannelSettings":{"kafkaClientProperties":[{"key":"linger.ms","value":"100"}],"keyColumns":["col3"],"writeMode":"canal"}} kafkaClientProperties:kafka producer 参数,写入 kafka 时使用

  • keyColumns, 需要写入的 kafka 列取值

  • writeMode,kafka 写入格式,目前支持 json/canal

  1. Holo2Holo

  • 示例: {"destinationChannelSettings":{"conflictMode":"replace","dynamicColumnAction":"replay","writeMode":"replay"}}

  • conflictMode: 写入 holo 冲突处理策略,replace-覆盖、ignore-忽略

  • writeMode: 写入 holo 方式,replay-重放、insert-插入

  • dynamicColumnAction:写入 holo 动态列方式 replay-重放、insert-插入,ignore-忽略

{ "structInfo": "MANAGED", "storageType": "TEXTFILE", "writeMode": "APPEND", "partitionColumns": [ { "columnName": "pt", "columnType": "STRING", "comment": "" } ], "fieldDelimiter": "" }

ColumnDataTypeSettings

array<object>

列类型映射数组

说明

["ColumnDataTypeSettings":[ { "SourceDataType":"Bigint", "DestinationDataType":"Text" } ]

object

单条列类型映射

DestinationDataType

string

目标端类型。如 bigint,boolean,string,text,datetime,timestamp,decimal,binary,不同数据源类型会有类型差异

text

SourceDataType

string

源端类型。如 bigint,boolean,string,text,datetime,timestamp,decimal,binary,不同数据源类型会有类型差异

bigint

CycleScheduleSettings

object

周期调度设置

CycleMigrationType

string

需要周期调度的同步类型。取值范围:

  • Full:全量

  • OfflineIncremental:离线增量

Full

ScheduleParameters

string

调度参数

bizdate=$bizdate

DdlHandlingSettings

array<object>

DDL 处理设置数组。

说明

["DDLHandlingSettings":[ { "Type":"Insert", "Action":"Normal" } ]

object

单条 DDL 处理设置

Action

string

处理动作,可选的枚举值:

  • Ignore(忽略)

  • Critical(报错)

  • Normal(正常处理)

Critical

Type

string

DDL 类型,可选的枚举值:

  • RenameColumn(重命名列)

  • ModifyColumn(重命名列)

  • CreateTable(重命名列)

  • TruncateTable(清空表)

  • DropTable(删除表)

  • DropColumn(删除列)

  • AddColumn(新增列)

AddColumn

RuntimeSettings

array<object>

运行时设置

object

Name

string

设置名称,可选的枚举值:

  • src.offline.datasource.max.connection(离线批量任务源端最大连接数)

  • dst.offline.truncate (是否清空目标表)

  • runtime.offline.speed.limit.enable(离线批量任务是否开启限流)

  • runtime.offline.concurrent(离线批量同步任务并发度)

  • runtime.enable.auto.create.schema(是否自动在目标端创建 schema)

  • runtime.realtime.concurrent(实时任务并发度)

  • runtime.realtime.failover.minute.dataxcdc (failover 失败重启等待时间单位分钟)

  • runtime.realtime.failover.times.dataxcdc (failover 失败重启次数)

runtime.offline.concurrent

Value

string

设置取值

1

JobName deprecated

string

该字段已废弃,请使用 Name 字段。

mysql_to_holo_sync_8772

Owner

string

任务责任人。

3726346

FileSpec

string

脚本模式代码内容。

{ "resourceSettings": { "realtimeResourceSettings": { "requestedCu": 2, "resourceGroupIdentifier": "Serverless_res_group_123_456" }, "offlineResourceSettings": { "requestedCu": 2, "resourceGroupIdentifier": "Serverless_res_group_123_456" } }, "tableMappings": [ { "sourceObjectSelectionRules": [ { "expression": "autotest_hologres", "action": "Include", "expressionType": "Exact", "objectType": "Datasource" }, { "expression": "auto_holo_2661647", "action": "Include", "expressionType": "Exact", "objectType": "Table" }, { "expression": "public", "action": "Include", "expressionType": "Exact", "objectType": "Schema" } ], "transformationRules": [ { "ruleTargetType": "Table", "ruleActionType": "SourceSchema", "ruleName": "SourceSchema_Table_BStf8aXPSCJjOWGe" }, { "ruleTargetType": "Schema", "ruleActionType": "Rename", "ruleName": "Rename_Schema_3qWNOIsljtInvKJy" }, { "ruleTargetType": "Table", "ruleActionType": "Rename", "ruleName": "Rename_Table_o3PVQq1aIKDGoVVW" }, { "ruleTargetType": "Table", "ruleActionType": "DefineDstTableSettings", "ruleName": "DefineDstTableSettings_Table_BhJltOmOCIc81fzi" }, { "ruleTargetType": "Table", "ruleActionType": "ColumnMapping", "ruleName": "ColumnMapping_Table_nP4hJPX1wh2W3fpo" } ] } ], "sourceDataSourceSettings": [ { "dataSourceProperties": { "timeZone": "Asia/Shanghai" }, "dataSourceName": "autotest_hologres" } ], "jobSettings": { "runtimeSettings": [ ], "ddlHandlingSettings": [ ], "columnDataTypeSettings": [ ], "cycleScheduleSettings": { }, "channelSettings": { "destinationChannelSettings": { "conflictMode": "replace", "dynamicColumnAction": "replay", "writeMode": "replay" }, "sourceChannelSettings": { } } }, "destinationDataSourceType": "Hologres", "transformationRules": [ { "ruleTargetType": "Table", "ruleName": "SourceSchema_Table_BStf8aXPSCJjOWGe", "ruleActionType": "SourceSchema", "ruleExpression": { "columns": [ { "name": "id", "category": "normal", "type": "BIGINT" }, { "name": "decimal", "category": "normal", "type": "DECIMAL" } ] } }, { "ruleTargetType": "Schema", "ruleName": "Rename_Schema_3qWNOIsljtInvKJy", "ruleActionType": "Rename", "ruleExpression": { "expression": "public" } }, { "ruleTargetType": "Table", "ruleName": "Rename_Table_o3PVQq1aIKDGoVVW", "ruleActionType": "Rename", "ruleExpression": { "expression": "auto_holo_2661647_dst" } }, { "ruleTargetType": "Table", "ruleName": "DefineDstTableSettings_Table_BhJltOmOCIc81fzi", "ruleActionType": "DefineDstTableSettings", "ruleExpression": { "ddlString": "BEGIN; CREATE TABLE IF NOT EXISTS public.auto_holo_2661647_dst ( id BIGINT PRIMARY KEY, "decimal" DECIMAL(38,18) ); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'time_to_live_in_seconds', '3153600000'); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'orientation', 'column'); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'binlog.level', 'replica'); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'binlog.ttl', '2592000'); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'bitmap_columns', '"text","char","varchar"'); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'dictionary_encoding_columns', '"text":auto,"bytea":auto,"char":auto,"varchar":auto'); CALL SET_TABLE_PROPERTY('public.auto_holo_2661647_dst', 'distribution_key', '"id"'); COMMIT; ", "ddlType": "STRUCT" } }, { "ruleTargetType": "Table", "ruleName": "ColumnMapping_Table_nP4hJPX1wh2W3fpo", "ruleActionType": "ColumnMapping", "ruleExpression": { "columnMapping": [ { "sourceColName": "id", "dstColName": "id" }, { "sourceColName": "decimal", "dstColName": "decimal" } ] } } ], "migrationType": "FullAndRealtimeIncremental", "destinationDataSourceSettings": [ { "dataSourceProperties": { }, "dataSourceName": "autotest_hologres" } ], "sourceDataSourceType": "Hologres" }

返回参数

名称

类型

描述

示例值

object

Schema of Response

Id

integer

数据集成任务 ID。

11792

RequestId

string

请求的 ID。用于定位日志,排查问题。

4F6AB6B3-41FB-5EBB-AFB2-0C98D49DA2BB

DIJobId deprecated

integer

该字段已废弃,请使用 Id 字段。

11792

示例

正常返回示例

JSON格式

{
  "Id": 11792,
  "RequestId": "4F6AB6B3-41FB-5EBB-AFB2-0C98D49DA2BB",
  "DIJobId": 11792
}

错误码

访问错误中心查看更多错误码。

变更历史

更多信息,参考变更详情