DTS触发器

DTS(Data Transmission Service)作为事件源通过事件总线EventBridge与函数计算集成后,通过DTS触发器能够触发关联函数执行。通过函数可以对DTS数据订阅获取的数据库实时增量数据进行自定义处理。本文介绍如何在函数计算控制台创建DTS触发器、配置入口参数以及编写代码并测试代码。

功能简介

您在函数计算的控制台提交触发器创建请求之后,函数计算会根据触发器的配置信息,自动在事件总线EventBridge侧创建事件流资源。

创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当DTS数据订阅捕捉到数据库的增量数据后,将会触发函数执行,触发时会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理。

注意事项

  • 作为触发源的DTS数据订阅任务必须和函数计算的函数在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建DTS触发器。关于事件流的数量限制,请参见使用限制

前提条件

步骤一:创建DTS触发器

  1. 登录函数计算控制台,在左侧导航栏,单击函数

  2. 在顶部菜单栏,选择地域,然后在函数页面,单击目标函数。

  3. 在函数配置页面,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器

  4. 在创建触发器面板,填写相关信息,然后单击确定

    基础配置项说明如下所示。

    配置项

    取值说明

    本文示例

    触发器类型

    触发器类型。关于支持的触发器类型,请参见触发器简介

    DTS

    名称

    自定义的触发器名称。

    dts-trigger

    版本或别名

    默认值为LATEST,如果您需要创建其他版本或别名的触发器,请先在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本管理别名

    LATEST

    数据订阅任务

    已创建的数据订阅任务名称。

    dtsqntc2***

    消费组

    已创建的用于消费订阅任务数据的消费组名称。

    重要

    请确保该消费组没有在其他客户端的实例上运行,否则可能导致传入的消费位点失效。

    test

    账号

    创建消费组时设置的账号。

    test

    密码

    创建消费组时设置的密码。

    ******

    消费位点

    期望消费第一条数据的时间戳。消费位点必须在订阅实例的数据范围之内。

    说明

    消费位点仅在新消费组第一次运行时生效。若后续任务重启,会基于上次消费位点继续消费。

    2022-06-21 00:00:00

    调用方式

    选择函数调用方式。

    取值说明如下。

    • 同步调用:适用于顺序调用场景。单个(批)事件触发函数调用,等待函数执行完成返回结果后,再由下一个(批)事件继续触发函数调用。同步调用请求正文有效负载最大为32 MB。更多信息,请参见同步调用

    • 异步调用:可以快速消费事件。单个(批)事件触发函数调用,函数计算会立刻返回响应,再由下一个(批)事件继续触发函数调用。该过程中函数会异步执行。异步调用请求正文有效负载最大为128 KB。更多信息,请参见功能概览

    同步调用

    触发器启用状态

    创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。

    不涉及

    关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤二:配置函数入口参数

DTS事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件。

  1. 在函数配置页面的代码页签,单击测试函数右侧的image.png图标,从下拉列表中,选择配置测试参数

  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件,填写事件名称和事件内容,然后单击确定。

    event格式如下所示:

    [
      {
        "data": {
          "id": 321****,
          "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
          },
          "offset": 3218099,
          "sourceTimestamp": 1654847757,
          "operationType": "UPDATE",
          "schema": {
            "recordFields": [
              {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            ],
            "nameIndex": {
              "id": {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              "topic": {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
              "indexType": "PrimaryKey",
              "indexFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                }
              ],
              "cardinality": 0,
              "nullable": true,
              "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
              "databaseType": "MySQL",
              "version": "5.7.35-log"
            },
            "totalRows": 0
          },
          "beforeImage": {
            "recordSchema": {
              "recordFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              ],
              "nameIndex": {
                "id": {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                "topic": {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              },
              "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
              "databaseName": "hangzhou-test-db",
              "tableName": "message_info",
              "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                  {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    104,
                    101,
                    108,
                    108,
                    111
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 9,
                    "capacity": 9,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 45
                  },
                    "afterImage": {
                    "recordSchema": {
                    "recordFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                    ],
                    "nameIndex": {
                    "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                  },
                    "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                    "databaseName": "hangzhou-test-db",
                    "tableName": "message_info",
                    "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    98,
                    121,
                    101
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 11,
                    "capacity": 11,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 47
                  }
                  },
        "id": "12f701a43741d404fa9a7be89d9acae0-321****",
        "source": "DTSstreamDemo",
        "specversion": "1.0",
        "type": "dts:ConsumeMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-10T07:55:57Z",
        "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
      }
    ]

    CloudEvents规范中定义的参数解释,请参见事件概述

    data字段包含的参数解释如下表所示。

    参数

    类型

    说明

    id

    String

    DTS数据ID。

    topicPartition

    Array

    Topic的分区信息。

    hash

    String

    DTS底层存储参数。

    partition

    String

    Topic的分区。

    topic

    String

    Topic的名称。

    offset

    Int

    DTS数据对应的消息存储位点。

    sourceTimestamp

    Int

    DTS数据生成时间戳。

    operationType

    String

    DTS数据的操作类型。

    schema

    Array

    数据库表结构信息。

    recordFields

    Array

    字段详情记录。

    fieldName

    String

    字段名称。

    rawDataTypeNum

    Int

    字段类型映射值。

    该值对应从数据订阅通道中获取的增量数据反序列化后的dataTypeNumber字段值,详情请参见使用Kafka客户端消费订阅数据

    isPrimaryKey

    Boolean

    字段是否是主键。

    isUniqueKey

    Boolean

    字段是否是唯一值。

    fieldPosition

    String

    字段位置。

    nameIndex

    Array

    命名索引。

    schemaId

    String

    数据库表结构信息的ID。

    databaseName

    String

    数据库名称。

    tableName

    String

    数据表名称。

    primaryIndexInfo

    String

    主键索引。

    indexType

    String

    主键索引类型。

    indexFields

    Array

    主键索引字段内容。

    cardinality

    String

    主键基数。

    nullable

    Boolean

    主键是否可为null。

    isFirstUniqueIndex

    Boolean

    是否是第一个唯一索引。

    uniqueIndexInfo

    String

    唯一索引。

    foreignIndexInfo

    String

    外键索引。

    normalIndexInfo

    String

    普通索引。

    databaseInfo

    Array

    数据库信息。

    databaseType

    String

    数据库类型。

    version

    String

    数据库版本。

    totalRows

    Int

    数据表的总行数。

    beforeImage

    String

    操作前记录字段内容镜像。

    values

    String

    记录字段的值。

    size

    Int

    记录字段大小。

    afterImage

    String

    操作后记录字段内容镜像。

步骤三:编写函数代码并测试

触发器创建完成后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当DTS数据订阅捕捉到数据库的增量数据后,触发器会自动触发函数的执行。

  1. 在函数配置页面的代码页签,在代码编辑器中编写代码,然后单击部署代码

    本文以Node.js函数代码为例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 单击测试函数

更多信息

除了函数计算控制台,您还可通过以下方式配置触发器:

  • 通过SDK配置触发器。更多操作,请参见SDK列表

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理