本文介绍如何在事件总线EventBridge控制台添加数据传输服务DTS作为事件流中的事件提供方。

前提条件

支持地域

支持将事件流中的事件提供方设置为数据传输服务DTS的地域有华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华南1(深圳)、华南3(广州)、西南1(成都)、中国香港。

操作步骤

重要 事件总线EventBridge的事件流仅中转操作类型为INSERT、DELETE、UPDATE和DDL的DTS数据。
  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,配置以下参数,然后单击创建
    1. 基本信息页签,设置事件流名称描述,单击下一步
    2. 事件源页签,选择事件提供方数据库 DTS,设置以下参数,然后单击下一步
      参数说明示例
      数据订阅任务选择您在数据传输服务DTS控制台上创建的数据订阅任务ID。dts8jqe****
      接入方式默认为创建的数据订阅任务的接入方式且不可更改。RDS
      实例ID默认为创建数据订阅任务时订阅的实例且不可更改。rm-bp18mj3q2dzyb****
      消费组在前提条件中创建的用于消费订阅任务数据的消费组名称。
      说明 请确保该消费组没有在其他客户端的实例上运行,否则可能导致传入的消费位点失效。
      test
      账号创建消费组时设置的账号。test
      密码创建消费组时设置的密码。******
      消费位点期望消费第一条数据的时间戳。消费位点必须在订阅实例的数据范围之内。
      说明 消费位点仅在新消费组第一次运行时生效,若后续任务重启,则会基于上次消费位点继续消费。
      2022-06-21 00:00:00
    3. 规则目标页签,分别设置事件规则和事件目标。
      说明 如果您的业务需要保证顺序消费,那么需要在设置事件目标时将容错策略设置为禁止容错。在这种情况下,若您的事件流目标端消费消息时出现异常,事件流将暂停,直至目标端恢复正常。
    4. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用
      启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

事件示例

在DTS数据库创建数据订阅任务时,当订阅的实例类型为MySQL实例时,事件示例如下:

{
  "data": {
    "id": 321****,
    "topicPartition": {
      "hash": 0,
      "partition": 0,
      "topic": "cn_hangzhou_rm_1234****_test_version2"
    },
    "offset": 3218099,
    "sourceTimestamp": 1654847757,
    "operationType": "UPDATE",
    "schema": {
      "recordFields": [
        {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      ],
      "nameIndex": {
        "id": {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        "topic": {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      },
      "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
      "databaseName": "hangzhou--test-db",
      "tableName": "message_info",
      "primaryIndexInfo": {
        "indexType": "PrimaryKey",
        "indexFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          }
        ],
        "cardinality": 0,
        "nullable": true,
        "isFirstUniqueIndex": false
      },
      "uniqueIndexInfo": [],
      "foreignIndexInfo": [],
      "normalIndexInfo": [],
      "databaseInfo": {
        "databaseType": "MySQL",
        "version": "5.7.35-log"
      },
      "totalRows": 0
    },
    "beforeImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              104,
              101,
              108,
              108,
              111
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 9,
            "capacity": 9,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 45
    },
    "afterImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              98,
              121,
              101
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 11,
            "capacity": 11,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 47
    }
  },
  "id": "12f701a43741d404fa9a7be89d9acae0-321****",
  "source": "DTSstreamDemo",
  "specversion": "1.0",
  "type": "dts:ConsumeMessage",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-10T07:55:57Z",
  "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数类型说明
idStringDTS数据ID。
topicPartitionArrayTopic的分区信息。
hashStringDTS底层存储参数。
partitionStringTopic的分区。
topicStringTopic的名称。
offsetIntDTS数据对应的消息存储位点。
sourceTimestampIntDTS数据生成时间戳。
operationTypeStringDTS数据的操作类型。
schemaArray数据库表结构信息。
recordFieldsArray字段详情记录。
fieldNameString字段名称。
rawDataTypeNumInt字段类型映射值。
isPrimaryKeyBoolean字段是否是主键。
isUniqueKeyBoolean字段是否是唯一值。
fieldPositionString字段位置。
nameIndexArray命名索引。
schemaIdString数据库表结构信息的ID。
databaseNameString数据库名称。
tableNameString数据表名称。
primaryIndexInfoString主键索引。
indexTypeString主键索引类型。
indexFieldsArray主键索引字段内容。
cardinalityString主键基数。
nullableBoolean主键是否可为null。
isFirstUniqueIndexBoolean是否是第一个唯一索引。
uniqueIndexInfoString唯一索引。
foreignIndexInfoString外键索引。
normalIndexInfoString普通索引。
databaseInfoArray数据库信息。
databaseTypeString数据库类型。
versionString数据库版本。
totalRowsInt数据表的总行数。
beforeImageString操作前记录字段内容镜像。
valuesString记录字段的值。
sizeInt记录字段大小。
afterImageString操作后记录字段内容镜像。