Data Transmission Service (DTS) acts as an event source and integrates with Function Compute through EventBridge. After integration, you can use DTS triggers to invoke associated functions. This lets you perform custom processing on real-time incremental data that you retrieve from DTS change tracking tasks. This topic describes how to create a DTS trigger, configure function input parameters, and write and test code in the Function Compute console.
Overview
After you submit a request to create a trigger in the Function Compute console, Function Compute automatically creates event stream resources in EventBridge based on the trigger configuration.
After the resources are created, you can view the trigger information in the Function Compute console. You can also view information about the automatically created resources in the EventBridge console. When a DTS change tracking task captures incremental data from a database, the associated function is invoked. One or more message events are pushed to the function in batches for processing based on your batch configurations.
Precautions
The DTS change tracking task that serves as the trigger source must be in the same region as the Function Compute function.
If the number of created event streams reaches the limit, you cannot create more DTS triggers. For more information about the event stream limit, see Limits.
Prerequisites
EventBridge
Function Compute
Data Transmission Service (DTS)
Step 1: Create a DTS trigger
Log on to the Function Compute console. In the left-side navigation pane, click Functions.
In the top navigation bar, select a region. On the Functions page, click the function that you want to manage.
On the function details page, click the Trigger tab and click Create Trigger.
In the Create Trigger panel, configure the parameters and click OK.
The following table describes the basic parameters.
Configuration Item
Description
Example
Trigger Type
The type of the trigger. For more information about the supported trigger types, see Trigger overview.
DTS
Name
A custom name for the trigger.
dts-trigger
Version or Alias
The default value is LATEST. If you want to create a trigger for another version or alias, switch to that version or alias in the upper-right corner of the function details page. For more information about versions and aliases, see Version management and Alias management.
LATEST
Change Tracking Task
The name of the existing change tracking task.
dtsqntc2***
Consumer Group
The name of the consumer group used to consume data from the tracking task.
ImportantMake sure that the consumer group is not running on instances of other clients. Otherwise, the specified consumer offset may become invalid.
test
Account
The account specified when you created the consumer group.
test
Password
The password specified when you created the consumer group.
******
Consumer Offset
The timestamp when you want to consume the first data record. The consumer offset must be within the timestamp range of the tracking task.
NoteThe consumer offset takes effect only when a new consumer group runs for the first time. If the task restarts later, consumption continues from the last consumer offset.
2022-06-21 00:00:00
Invocation Method
Select a function invocation method.
The following describes the values.
Sync Invocation: This method is suitable for sequential invocation scenarios. A single event or a batch of events triggers a function invocation. The system waits for the function to execute and return a result before the next event or batch of events triggers another function invocation. The maximum payload of a synchronous invocation request is 32 MB. For more information, see Synchronous invocations.
Async Invocation: This method lets you quickly consume events. A single event or a batch of events triggers a function invocation. Function Compute immediately returns a response, and the next event or batch of events triggers another function invocation. During this process, the function is executed asynchronously. The maximum payload of an asynchronous invocation request is 128 KB. For more information, see Overview.
Synchronous Invocation
Trigger State
Specifies whether to enable the trigger immediately after it is created. By default, Enable Trigger is selected. This means the trigger is enabled immediately after it is created.
Enabled
For more information about advanced configurations such as push configurations, retries, and dead-letter queues, see Advanced features.
After the trigger is created, it is displayed on the Triggers tab. To modify or delete a trigger, see Trigger Management.
Step 2: Configure function input parameters
The DTS event source passes an event as an input parameter to the function. You can manually pass the event to the function to simulate a triggering event.
On the Code tab of the function details page, click the
icon next Test Function and select Configure Test Parameters from the drop-down list. In the Configure Test Parameters panel, select Create New Test Event or Modify Existing Test Event. Then, enter the event name and content, and click OK.
The following is the format of the
event:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]For descriptions of the parameters in the CloudEvents specification, see Event overview.
The following table describes the parameters in the data field.
Parameter
Type
Description
id
String
The DTS data ID.
topicPartition
Array
Information about the topic partitions.
hash
String
The DTS underlying storage parameter.
partition
String
The partition of the topic.
topic
String
The name of the topic.
offset
Int
Specifies the log broker offset for DTS data.
sourceTimestamp
Int
The timestamp when the DTS data was generated.
operationType
String
The operation type of the DTS data.
schema
Array
Information about the database table schema.
recordFields
Array
Details of the field records.
fieldName
String
The name of the field.
rawDataTypeNum
Int
The mapping value of the field type.
This value corresponds to the `dataTypeNumber` field value after the incremental data obtained from the change tracking channel is deserialized. For more information, see Use a Kafka client to consume subscribed data.
isPrimaryKey
Boolean
Indicates whether the field is a primary key.
isUniqueKey
Boolean
Indicates whether the field has a unique value.
fieldPosition
String
The position of the field.
nameIndex
Array
The named index.
schemaId
String
The ID of the database table schema information.
databaseName
String
The name of the database.
tableName
String
The name of the data table.
primaryIndexInfo
String
The primary key index.
indexType
String
The type of the primary key index.
indexFields
Array
The content of the primary key index fields.
cardinality
String
The cardinality of the primary key.
nullable
Boolean
Indicates whether the primary key can be null.
isFirstUniqueIndex
Boolean
Indicates whether this is the first unique index.
uniqueIndexInfo
String
The unique index.
foreignIndexInfo
String
The foreign key index.
normalIndexInfo
String
The normal index.
databaseInfo
Array
Information about the database.
databaseType
String
The type of the database.
version
String
The version of the database.
totalRows
Int
The total number of rows in the data table.
beforeImage
String
The image of the record field content before the operation.
values
String
The value of the record field.
size
Int
The size of the record field.
afterImage
String
The image of the record field content after the operation.
Step 3: Write and test function code
After you create the trigger, you can write and test the function code to verify its correctness. In practice, when the DTS change tracking task captures incremental data from the database, the trigger automatically invokes the function.
On the Code tab of the function details page, you can write code in the code editor and click Deploy.
This topic uses Node.js code as an example.
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }Click Test Function.
More information
In addition to the Function Compute console, you can configure triggers by using one of the following methods:
Use Serverless Devs tool to configure triggers. For more operations, please refer to Common Serverless Devs commands.
Use SDKs to configure triggers. For more information, see SDKs.
To modify or delete an existing trigger, see Manage triggers.