DTS triggers

更新时间:
复制 MD 格式

Data Transmission Service (DTS) acts as an event source and integrates with Function Compute through EventBridge. After integration, you can use DTS triggers to invoke associated functions. This lets you perform custom processing on real-time incremental data that you retrieve from DTS change tracking tasks. This topic describes how to create a DTS trigger, configure function input parameters, and write and test code in the Function Compute console.

Overview

After you submit a request to create a trigger in the Function Compute console, Function Compute automatically creates event stream resources in EventBridge based on the trigger configuration.

After the resources are created, you can view the trigger information in the Function Compute console. You can also view information about the automatically created resources in the EventBridge console. When a DTS change tracking task captures incremental data from a database, the associated function is invoked. One or more message events are pushed to the function in batches for processing based on your batch configurations.

Precautions

  • The DTS change tracking task that serves as the trigger source must be in the same region as the Function Compute function.

  • If the number of created event streams reaches the limit, you cannot create more DTS triggers. For more information about the event stream limit, see Limits.

Prerequisites

Step 1: Create a DTS trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Functions.

  2. In the top navigation bar, select a region. On the Functions page, click the function that you want to manage.

  3. On the function details page, click the Trigger tab and click Create Trigger.

  4. In the Create Trigger panel, configure the parameters and click OK.

    The following table describes the basic parameters.

    Configuration Item

    Description

    Example

    Trigger Type

    The type of the trigger. For more information about the supported trigger types, see Trigger overview.

    DTS

    Name

    A custom name for the trigger.

    dts-trigger

    Version or Alias

    The default value is LATEST. If you want to create a trigger for another version or alias, switch to that version or alias in the upper-right corner of the function details page. For more information about versions and aliases, see Version management and Alias management.

    LATEST

    Change Tracking Task

    The name of the existing change tracking task.

    dtsqntc2***

    Consumer Group

    The name of the consumer group used to consume data from the tracking task.

    Important

    Make sure that the consumer group is not running on instances of other clients. Otherwise, the specified consumer offset may become invalid.

    test

    Account

    The account specified when you created the consumer group.

    test

    Password

    The password specified when you created the consumer group.

    ******

    Consumer Offset

    The timestamp when you want to consume the first data record. The consumer offset must be within the timestamp range of the tracking task.

    Note

    The consumer offset takes effect only when a new consumer group runs for the first time. If the task restarts later, consumption continues from the last consumer offset.

    2022-06-21 00:00:00

    Invocation Method

    Select a function invocation method.

    The following describes the values.

    • Sync Invocation: This method is suitable for sequential invocation scenarios. A single event or a batch of events triggers a function invocation. The system waits for the function to execute and return a result before the next event or batch of events triggers another function invocation. The maximum payload of a synchronous invocation request is 32 MB. For more information, see Synchronous invocations.

    • Async Invocation: This method lets you quickly consume events. A single event or a batch of events triggers a function invocation. Function Compute immediately returns a response, and the next event or batch of events triggers another function invocation. During this process, the function is executed asynchronously. The maximum payload of an asynchronous invocation request is 128 KB. For more information, see Overview.

    Synchronous Invocation

    Trigger State

    Specifies whether to enable the trigger immediately after it is created. By default, Enable Trigger is selected. This means the trigger is enabled immediately after it is created.

    Enabled

    For more information about advanced configurations such as push configurations, retries, and dead-letter queues, see Advanced features.

    After the trigger is created, it is displayed on the Triggers tab. To modify or delete a trigger, see Trigger Management.

Step 2: Configure function input parameters

The DTS event source passes an event as an input parameter to the function. You can manually pass the event to the function to simulate a triggering event.

  1. On the Code tab of the function details page, click the image.png icon next Test Function and select Configure Test Parameters from the drop-down list.

  2. In the Configure Test Parameters panel, select Create New Test Event or Modify Existing Test Event. Then, enter the event name and content, and click OK.

    The following is the format of the event:

    [
      {
        "data": {
          "id": 321****,
          "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
          },
          "offset": 3218099,
          "sourceTimestamp": 1654847757,
          "operationType": "UPDATE",
          "schema": {
            "recordFields": [
              {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            ],
            "nameIndex": {
              "id": {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              "topic": {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
              "indexType": "PrimaryKey",
              "indexFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                }
              ],
              "cardinality": 0,
              "nullable": true,
              "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
              "databaseType": "MySQL",
              "version": "5.7.35-log"
            },
            "totalRows": 0
          },
          "beforeImage": {
            "recordSchema": {
              "recordFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              ],
              "nameIndex": {
                "id": {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                "topic": {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              },
              "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
              "databaseName": "hangzhou-test-db",
              "tableName": "message_info",
              "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                  {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    104,
                    101,
                    108,
                    108,
                    111
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 9,
                    "capacity": 9,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 45
                  },
                    "afterImage": {
                    "recordSchema": {
                    "recordFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                    ],
                    "nameIndex": {
                    "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                  },
                    "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                    "databaseName": "hangzhou-test-db",
                    "tableName": "message_info",
                    "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    98,
                    121,
                    101
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 11,
                    "capacity": 11,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 47
                  }
                  },
        "id": "12f701a43741d404fa9a7be89d9acae0-321****",
        "source": "DTSstreamDemo",
        "specversion": "1.0",
        "type": "dts:ConsumeMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-10T07:55:57Z",
        "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
      }
    ]

    For descriptions of the parameters in the CloudEvents specification, see Event overview.

    The following table describes the parameters in the data field.

    Parameter

    Type

    Description

    id

    String

    The DTS data ID.

    topicPartition

    Array

    Information about the topic partitions.

    hash

    String

    The DTS underlying storage parameter.

    partition

    String

    The partition of the topic.

    topic

    String

    The name of the topic.

    offset

    Int

    Specifies the log broker offset for DTS data.

    sourceTimestamp

    Int

    The timestamp when the DTS data was generated.

    operationType

    String

    The operation type of the DTS data.

    schema

    Array

    Information about the database table schema.

    recordFields

    Array

    Details of the field records.

    fieldName

    String

    The name of the field.

    rawDataTypeNum

    Int

    The mapping value of the field type.

    This value corresponds to the `dataTypeNumber` field value after the incremental data obtained from the change tracking channel is deserialized. For more information, see Use a Kafka client to consume subscribed data.

    isPrimaryKey

    Boolean

    Indicates whether the field is a primary key.

    isUniqueKey

    Boolean

    Indicates whether the field has a unique value.

    fieldPosition

    String

    The position of the field.

    nameIndex

    Array

    The named index.

    schemaId

    String

    The ID of the database table schema information.

    databaseName

    String

    The name of the database.

    tableName

    String

    The name of the data table.

    primaryIndexInfo

    String

    The primary key index.

    indexType

    String

    The type of the primary key index.

    indexFields

    Array

    The content of the primary key index fields.

    cardinality

    String

    The cardinality of the primary key.

    nullable

    Boolean

    Indicates whether the primary key can be null.

    isFirstUniqueIndex

    Boolean

    Indicates whether this is the first unique index.

    uniqueIndexInfo

    String

    The unique index.

    foreignIndexInfo

    String

    The foreign key index.

    normalIndexInfo

    String

    The normal index.

    databaseInfo

    Array

    Information about the database.

    databaseType

    String

    The type of the database.

    version

    String

    The version of the database.

    totalRows

    Int

    The total number of rows in the data table.

    beforeImage

    String

    The image of the record field content before the operation.

    values

    String

    The value of the record field.

    size

    Int

    The size of the record field.

    afterImage

    String

    The image of the record field content after the operation.

Step 3: Write and test function code

After you create the trigger, you can write and test the function code to verify its correctness. In practice, when the DTS change tracking task captures incremental data from the database, the trigger automatically invokes the function.

  1. On the Code tab of the function details page, you can write code in the code editor and click Deploy.

    This topic uses Node.js code as an example.

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. Click Test Function.

More information

In addition to the Function Compute console, you can configure triggers by using one of the following methods:

  • Use Serverless Devs tool to configure triggers. For more operations, please refer to Common Serverless Devs commands.

  • Use SDKs to configure triggers. For more information, see SDKs.

To modify or delete an existing trigger, see Manage triggers.