本文介绍如何在事件总线EventBridge控制台添加数据传输服务DTS作为事件流中的事件提供方。
前提条件
- 在数据传输服务DTS控制台创建数据订阅任务且任务状态为正常。详细操作,请参见数据订阅操作指导。
- 在创建的数据订阅任务中新增消费组。
- 开通事件总线EventBridge并授权。
支持地域
支持将事件流中的事件提供方设置为数据传输服务DTS的地域有华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华南1(深圳)、华南3(广州)、西南1(成都)、中国香港。
操作步骤
重要 事件总线EventBridge的事件流仅中转操作类型为INSERT、DELETE、UPDATE和DDL的DTS数据。
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
- 在创建事件流面板,配置以下参数,然后单击创建。
事件示例
在DTS数据库创建数据订阅任务时,当订阅的实例类型为MySQL实例时,事件示例如下:
{
"data": {
"id": 321****,
"topicPartition": {
"hash": 0,
"partition": 0,
"topic": "cn_hangzhou_rm_1234****_test_version2"
},
"offset": 3218099,
"sourceTimestamp": 1654847757,
"operationType": "UPDATE",
"schema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou--test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"beforeImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
104,
101,
108,
108,
111
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 9,
"capacity": 9,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 45
},
"afterImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
98,
121,
101
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 11,
"capacity": 11,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 47
}
},
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}
CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
参数 | 类型 | 说明 |
---|---|---|
id | String | DTS数据ID。 |
topicPartition | Array | Topic的分区信息。 |
hash | String | DTS底层存储参数。 |
partition | String | Topic的分区。 |
topic | String | Topic的名称。 |
offset | Int | DTS数据对应的消息存储位点。 |
sourceTimestamp | Int | DTS数据生成时间戳。 |
operationType | String | DTS数据的操作类型。 |
schema | Array | 数据库表结构信息。 |
recordFields | Array | 字段详情记录。 |
fieldName | String | 字段名称。 |
rawDataTypeNum | Int | 字段类型映射值。 |
isPrimaryKey | Boolean | 字段是否是主键。 |
isUniqueKey | Boolean | 字段是否是唯一值。 |
fieldPosition | String | 字段位置。 |
nameIndex | Array | 命名索引。 |
schemaId | String | 数据库表结构信息的ID。 |
databaseName | String | 数据库名称。 |
tableName | String | 数据表名称。 |
primaryIndexInfo | String | 主键索引。 |
indexType | String | 主键索引类型。 |
indexFields | Array | 主键索引字段内容。 |
cardinality | String | 主键基数。 |
nullable | Boolean | 主键是否可为null。 |
isFirstUniqueIndex | Boolean | 是否是第一个唯一索引。 |
uniqueIndexInfo | String | 唯一索引。 |
foreignIndexInfo | String | 外键索引。 |
normalIndexInfo | String | 普通索引。 |
databaseInfo | Array | 数据库信息。 |
databaseType | String | 数据库类型。 |
version | String | 数据库版本。 |
totalRows | Int | 数据表的总行数。 |
beforeImage | String | 操作前记录字段内容镜像。 |
values | String | 记录字段的值。 |
size | Int | 记录字段大小。 |
afterImage | String | 操作后记录字段内容镜像。 |