本文介绍如何在事件总线EventBridge控制台添加数据传输服务DTS作为事件流中的事件提供方。
前提条件
在数据传输服务DTS控制台创建数据订阅任务且任务状态为正常。详细操作,请参见数据订阅操作指导。
在创建的数据订阅任务中新增消费组。
支持地域
支持将事件流中的事件提供方设置为数据传输服务DTS的地域有华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华南1(深圳)、华南3(广州)、西南1(成都)、中国香港。
操作步骤
事件总线EventBridge的事件流仅中转操作类型为INSERT、DELETE、UPDATE和DDL的DTS数据。
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为数据库 DTS,设置以下参数,然后单击下一步。
参数
说明
示例
数据订阅任务
选择您在数据传输服务DTS控制台上创建的数据订阅任务ID。
dts8jqe****
接入方式
默认为创建的数据订阅任务的接入方式且不可更改。
RDS
实例ID
默认为创建数据订阅任务时订阅的实例且不可更改。
rm-bp18mj3q2dzyb****
消费组
在前提条件中创建的用于消费订阅任务数据的消费组名称。
说明请确保该消费组没有在其他客户端的实例上运行,否则可能导致传入的消费位点失效。
test
账号
创建消费组时设置的账号。
test
密码
创建消费组时设置的密码。
******
消费位点
期望消费第一条数据的时间戳。消费位点必须在订阅实例的数据范围之内。
说明消费位点仅在新消费组第一次运行时生效,若后续任务重启,则会基于上次消费位点继续消费。
2022-06-21 00:00:00
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。
例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。
开启
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)、Transform(转换)及Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗。
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
事件示例
在DTS数据库创建数据订阅任务时,当订阅的实例类型为MySQL实例时,事件示例如下:
{
"data": {
"id": 321****,
"topicPartition": {
"hash": 0,
"partition": 0,
"topic": "cn_hangzhou_rm_1234****_test_version2"
},
"offset": 3218099,
"sourceTimestamp": 1654847757,
"operationType": "UPDATE",
"schema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou--test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"beforeImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
104,
101,
108,
108,
111
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 9,
"capacity": 9,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 45
},
"afterImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
98,
121,
101
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 11,
"capacity": 11,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 47
}
},
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}
CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
参数 | 类型 | 说明 |
id | String | DTS数据ID。 |
topicPartition | Array | Topic的分区信息。 |
hash | String | DTS底层存储参数。 |
partition | String | Topic的分区。 |
topic | String | Topic的名称。 |
offset | Int | DTS数据对应的消息存储位点。 |
sourceTimestamp | Int | DTS数据生成时间戳。 |
operationType | String | DTS数据的操作类型。 |
schema | Array | 数据库表结构信息。 |
recordFields | Array | 字段详情记录。 |
fieldName | String | 字段名称。 |
rawDataTypeNum | Int | 字段类型映射值。 该值对应从数据订阅通道中获取的增量数据反序列化后的dataTypeNumber字段值,详情请参见使用Kafka客户端消费订阅数据。 |
isPrimaryKey | Boolean | 字段是否是主键。 |
isUniqueKey | Boolean | 字段是否是唯一值。 |
fieldPosition | String | 字段位置。 |
nameIndex | Array | 命名索引。 |
schemaId | String | 数据库表结构信息的ID。 |
databaseName | String | 数据库名称。 |
tableName | String | 数据表名称。 |
primaryIndexInfo | String | 主键索引。 |
indexType | String | 主键索引类型。 |
indexFields | Array | 主键索引字段内容。 |
cardinality | String | 主键基数。 |
nullable | Boolean | 主键是否可为null。 |
isFirstUniqueIndex | Boolean | 是否是第一个唯一索引。 |
uniqueIndexInfo | String | 唯一索引。 |
foreignIndexInfo | String | 外键索引。 |
normalIndexInfo | String | 普通索引。 |
databaseInfo | Array | 数据库信息。 |
databaseType | String | 数据库类型。 |
version | String | 数据库版本。 |
totalRows | Int | 数据表的总行数。 |
beforeImage | String | 操作前记录字段内容镜像。 |
values | String | 记录字段的值。 |
size | Int | 记录字段大小。 |
afterImage | String | 操作后记录字段内容镜像。 |