数据传输服务DTS

本文介绍如何在事件总线EventBridge控制台添加数据传输服务DTS作为事件流中的事件提供方。

前提条件

支持地域

支持将事件流中的事件提供方设置为数据传输服务DTS的地域有华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华南1(深圳)、华南3(广州)、西南1(成都)、中国香港。

操作步骤

重要

事件总线EventBridge的事件流仅中转操作类型为INSERT、DELETE、UPDATE和DDL的DTS数据。

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,设置任务名称描述,然后配置以下参数,最后单击保存

    • 任务创建:

      1. Source(源)配置向导,选择数据提供方数据库传输服务 DTS(数据库),设置以下参数,然后单击下一步

        参数

        说明

        示例

        数据订阅任务

        选择您在数据传输服务DTS控制台上创建的数据订阅任务名称。

        dts8jqe****

        接入方式

        默认为创建的数据订阅任务的接入方式且不可更改。

        RDS

        实例ID

        默认为创建数据订阅任务时订阅的实例且不可更改。

        rm-bp18mj3q2dzyb****

        消费组

        在前提条件中创建的用于消费订阅任务数据的消费组名称。

        说明

        请确保该消费组没有在其他客户端的实例上运行,否则可能导致传入的消费位点失效。

        test

        账号

        创建消费组时设置的账号。

        test

        密码

        创建消费组时设置的密码。

        ******

        消费位点

        期望消费第一条数据的时间戳。消费位点必须在订阅实例的数据范围之内。

        说明

        消费位点仅在新消费组第一次运行时生效,若后续任务重启,则会基于上次消费位点继续消费。

        2022-06-21 00:00:00

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)Transform(转换)Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  4. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

事件示例

在DTS数据库创建数据订阅任务时,当订阅的实例类型为MySQL实例时,事件示例如下:

{
  "data": {
    "id": 321****,
    "topicPartition": {
      "hash": 0,
      "partition": 0,
      "topic": "cn_hangzhou_rm_1234****_test_version2"
    },
    "offset": 3218099,
    "sourceTimestamp": 1654847757,
    "operationType": "UPDATE",
    "schema": {
      "recordFields": [
        {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      ],
      "nameIndex": {
        "id": {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        "topic": {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      },
      "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
      "databaseName": "hangzhou--test-db",
      "tableName": "message_info",
      "primaryIndexInfo": {
        "indexType": "PrimaryKey",
        "indexFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          }
        ],
        "cardinality": 0,
        "nullable": true,
        "isFirstUniqueIndex": false
      },
      "uniqueIndexInfo": [],
      "foreignIndexInfo": [],
      "normalIndexInfo": [],
      "databaseInfo": {
        "databaseType": "MySQL",
        "version": "5.7.35-log"
      },
      "totalRows": 0
    },
    "beforeImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              104,
              101,
              108,
              108,
              111
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 9,
            "capacity": 9,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 45
    },
    "afterImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              98,
              121,
              101
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 11,
            "capacity": 11,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 47
    }
  },
  "id": "12f701a43741d404fa9a7be89d9acae0-321****",
  "source": "DTSstreamDemo",
  "specversion": "1.0",
  "type": "dts:ConsumeMessage",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-10T07:55:57Z",
  "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数

类型

说明

id

String

DTS数据ID。

topicPartition

Array

Topic的分区信息。

hash

String

DTS底层存储参数。

partition

String

Topic的分区。

topic

String

Topic的名称。

offset

Int

DTS数据对应的消息存储位点。

sourceTimestamp

Int

DTS数据生成时间戳。

operationType

String

DTS数据的操作类型。

schema

Array

数据库表结构信息。

recordFields

Array

字段详情记录。

fieldName

String

字段名称。

rawDataTypeNum

Int

字段类型映射值。

该值对应从数据订阅通道中获取的增量数据反序列化后的dataTypeNumber字段值,详情请参见使用Kafka客户端消费订阅数据

isPrimaryKey

Boolean

字段是否是主键。

isUniqueKey

Boolean

字段是否是唯一值。

fieldPosition

String

字段位置。

nameIndex

Array

命名索引。

schemaId

String

数据库表结构信息的ID。

databaseName

String

数据库名称。

tableName

String

数据表名称。

primaryIndexInfo

String

主键索引。

indexType

String

主键索引类型。

indexFields

Array

主键索引字段内容。

cardinality

String

主键基数。

nullable

Boolean

主键是否可为null。

isFirstUniqueIndex

Boolean

是否是第一个唯一索引。

uniqueIndexInfo

String

唯一索引。

foreignIndexInfo

String

外键索引。

normalIndexInfo

String

普通索引。

databaseInfo

Array

数据库信息。

databaseType

String

数据库类型。

version

String

数据库版本。

totalRows

Int

数据表的总行数。

beforeImage

String

操作前记录字段内容镜像。

values

String

记录字段的值。

size

Int

记录字段大小。

afterImage

String

操作后记录字段内容镜像。