A workflow can be triggered through the CloudFlow console, by invoking an SDK, or by an event source. Workflow scheduling automates workflow execution by connecting event sources to workflows. It provides a centralized and unified way to manage different event sources. Define a set of trigger rules so that when an incoming event matches those rules, the event source automatically starts the associated workflow -- no manual invocation required.
Use cases
Automated image processing -- When a new image is uploaded to an Object Storage Service (OSS) bucket, a trigger starts a workflow that downloads the image for processing and stores the result in OSS or another downstream service.
Incremental log analysis -- When logs are updated in Simple Log Service (SLS), a trigger starts a workflow that queries and analyzes the new log entries.
Periodic data collection -- A time-based schedule runs a workflow at a fixed interval, such as once per hour, to collect and process data without manual intervention.
Trigger categories
CloudFlow organizes triggers into two categories based on how the event source integrates with the workflow.
Two-way integration triggers
You configure scheduling in both the workflow and the event source. The following trigger types use two-way integration:
| Trigger type | Standard mode | Express mode |
|---|---|---|
| Time-based scheduling | Asynchronous invocation | Synchronous invocation |
| Simple Log Service | Asynchronous invocation | Synchronous invocation |
| MNS | Asynchronous invocation | Synchronous invocation |
| ApsaraMQ for Kafka | Asynchronous invocation | Synchronous invocation |
| ApsaraMQ for RocketMQ | Asynchronous invocation | Synchronous invocation |
| ApsaraMQ for RabbitMQ | Asynchronous invocation | Synchronous invocation |
| HTTP | Asynchronous invocation | Synchronous invocation |
Cloud service event triggers
You configure scheduling in the workflow and create trigger rules in EventBridge. No configuration is required on the event source side.
| Trigger type | Standard mode | Express mode |
|---|---|---|
| Alibaba Cloud service event | Asynchronous invocation | Synchronous invocation |
Synchronous and asynchronous invocation
The invocation method determines when you receive a response:
| Invocation method | Behavior |
|---|---|
| Synchronous | Returns the execution result after the workflow finishes processing the event. |
| Asynchronous | Returns a response immediately after the event is written to the workflow's internal queue. The workflow system ensures that the messages can be processed. |
Event payload reference
Each trigger type passes a CloudEvents-compliant JSON payload to the workflow. To extract the Event parameter from the payload using an expression, see Inputs and outputs.
The following sections document the data field structure for each trigger type. Parameters outside the data field follow the CloudEvents specification.
Time-based scheduling
{
"datacontenttype":"application/json;charset=utf-8",
"aliyunaccountid":"143998900779****",
"aliyunpublishtime":"2022-09-21T05:00:00.035Z",
"data":{
"TimeZone":"GMT+0:00",
"Schedule":"0/30 * * * * ?",
"UserData":{
"key":"value"
}
},
"specversion":"1.0",
"aliyuneventbusname":"Housekeeping-Bus",
"id":"d100262d-90c7-4caf-a3b5-813f3526a1f7-****",
"source":"housekeeping.scheduledevent",
"time":"2022-09-21T05:00:00Z",
"aliyunregionid":"cn-beijing",
"type":"eventbridge:Events:ScheduledEvent"
}data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| TimeZone | String | GMT+8:00 | The time zone. |
| Schedule | String | 0 */10 * * * * | The cron expression. Used when Trigger Period is set to Fixed Period. |
| UserData | Object | {"key":"value"} | Custom parameters in JSON format. |
HTTP
{
"datacontenttype": "application/json",
"aliyunaccountid": "164901546557****",
"data": {
"headers": {
"content-length": "0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Host": "164901546557****.eventbridge.cn-hangzhou.aliyuncs.com",
"Accept-Encoding": "gzip, deflate",
"X-Forwarded-Port": "80",
"Upgrade-Insecure-Requests": "1",
"X-Forwarded-For": "183.247.0.***",
"Accept-Language": "zh-CN,zh;q=0.9",
"X-Real-IP": "183.247.0.***",
"X-Scheme": "http"
},
"path": "/webhook/putEvents",
"body": "",
"httpMethod": "GET",
"queryString": {}
},
"subject": "acs:eventbridge:cn-hangzhou:164901546557****:eventbus/eventbus-created-by-fnf-466ccc7e-418a-403f-8d96-2d73a8e****/eventsource/httpschedule",
"aliyunoriginalaccountid": "164901546557****",
"source": "httpschedule",
"type": "eventbridge:Events:HTTPEvent",
"aliyunpublishtime": "2023-08-06T18:37:01.666Z",
"specversion": "1.0",
"aliyuneventbusname": "eventbus-created-by-fnf-466ccc7e-418a-403f-8d96-2d73a8e****",
"id": "6751261d-e496-4b36-a707-3c087bf3****",
"time": "2023-08-07T02:37:01.666+08:00",
"aliyunregionid": "cn-hangzhou",
"aliyunpublishaddr": "183.247.0.***"
}data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| headers | Object | {"Accept": "application/json"} | The HTTP request headers. |
| path | String | /webhook/putEvents | The request path. |
| body | Object | {"filePath": "/tmp/uploader"} | The HTTP request body. |
| httpMethod | String | GET | The HTTP method. |
| queryString | String | username=leo | The query string from the HTTP URL. Excludes the token used to request the webhook. |
Alibaba Cloud service event
{
"id":"c2g71017-6f65-fhcf-a814-a396fc8d****",
"source":"OSS-FunctionFlow-osstrigger",
"specversion":"1.0",
"type":"oss:PutObject",
"datacontenttype":"application/json; charset=utf-8",
"subject":"acs:mns:cn-hangzhou:164901546557****:queues/zeus",
"time":"2021-04-08T06:28:17.093Z",
"aliyunaccountid":"1649015465574023",
"aliyunpublishtime":"2021-10-15T07:06:34.028Z",
"aliyunoriginalaccountid":"164901546557****",
"aliyuneventbusname":"OSS-FunctionFlow-osstrigger",
"aliyunregionid":"cn-chengdu",
"aliyunpublishaddr":"42.120.XX.XX",
"data":{
"*** The content varies based on the event source. ***"
}
}The data content varies by Alibaba Cloud service. For event types and formats of each service, see Alibaba Cloud service event sources.
Simple Log Service
[
{
"datacontenttype": "application/json;charset=utf-8",
"aliyunaccountid": "164901546557****",
"data": {
"key1": "value1",
"key2": "value2",
"__topic__": "test_topic",
"__source__": "test_source",
"__client_ip__": "122.231.XX.XX",
"__receive_time__": "1663487595",
"__pack_id__": "59b662b2257796****"
},
"subject": "acs:log:cn-qingdao:164901546557****:project/qiingdaoproject/logstore/qingdao-logstore-1",
"aliyunoriginalaccountid": "164901546557****",
"source": "SLS-FunctionFlow-slstrigger",
"type": "sls:connector",
"aliyunpublishtime": "2022-09-18T07:53:15.387Z",
"specversion": "1.0",
"aliyuneventbusname": "SLS-FunctionFlow-slstrigger",
"id": "qiingdaoproject-qingdao-logstore-1-1-MTY2MzExODM5ODY4NjAxOTQyMw****",
"time": "2022-09-18T07:53:12Z",
"aliyunregionid": "cn-qingdao",
"aliyunpublishaddr": "10.50.XX.XX"
}
]data parameters
Fields with __ prefixes and suffixes are Simple Log Service system fields. For details, see Reserved fields.
| Parameter | Type | Example | Description |
|---|---|---|---|
| key1 | String | testKey | A user-defined field in the log entry. |
| __topic__ | String | testTopic | The log topic. |
| __source__ | String | testSource | The device from which the log is collected. |
| __client_ip__ | String | 122.231.XX.XX | The IP address of the host where the log resides. |
| __receive_time__ | String | 1663487595 | The time when the log arrived at the server. UNIX timestamp. |
| __pack_id__ | String | 59b662b2257796**** | The unique ID of the log group to which the log belongs. |
MNS
[
{
"id":"c2g71017-6f65-fhcf-a814-a396fc8d****",
"source":"MNS-FunctionFlow-mnstrigger",
"specversion":"1.0",
"type":"mns:Queue:SendMessage",
"datacontenttype":"application/json; charset=utf-8",
"subject":"acs:mns:cn-hangzhou:164901546557****:queues/zeus",
"time":"2023-04-08T06:28:17.093Z",
"aliyunaccountid":"1649015465574023",
"aliyunpublishtime":"2023-10-15T07:06:34.028Z",
"aliyunoriginalaccountid":"164901546557****",
"aliyuneventbusname":"MNS-Function-mnstrigger",
"aliyunregionid":"cn-chengdu",
"aliyunpublishaddr":"42.120.XX.XX",
"data":{
"requestId":"606EA3074344430D4C81****",
"messageId":"C6DB60D1574661357FA227277445****",
"messageBody":"TEST"
}
}
]data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| requestId | String | 606EA3074344430D4C81**** | The request ID. Each request has a unique ID. |
| messageId | String | C6DB60D1574661357FA227277445**** | The message ID. Each message has a unique ID. |
| messageBody | String | TEST | The message body. |
ApsaraMQ for RocketMQ
[
{
"id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
"source":"RocketMQ-rocketmq-schedule",
"specversion":"1.0",
"type":"mq:Topic:SendMessage",
"datacontenttype":"application/json; charset=utf-8",
"subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
"time":"2023-04-08T06:01:20.766Z",
"aliyunaccountid":"164901546557****",
"aliyunpublishtime":"2023-10-15T02:05:16.791Z",
"aliyunoriginalaccountid":"164901546557****",
"aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
"aliyunregionid":"cn-chengdu",
"aliyunpublishaddr":"42.120.XX.XX",
"data":{
"topic":"TopicName",
"systemProperties":{
"MIN_OFFSET":"0",
"TRACE_ON":"true",
"MAX_OFFSET":"8",
"MSG_REGION":"cn-hangzhou",
"KEYS":"systemProperties.KEYS",
"CONSUME_START_TIME":1628577790396,
"TAGS":"systemProperties.TAGS",
"INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
},
"userProperties":{
},
"body":"TEST"
}
}
]data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| topic | String | TopicName | The topic name. |
| systemProperties | Map | The system properties. | |
| systemProperties.MIN_OFFSET | Int | 0 | The earliest offset. |
| systemProperties.TRACE_ON | Boolean | true | Whether to include a message trace. Valid values: true, false. |
| systemProperties.MAX_OFFSET | Int | 8 | The latest offset. |
| systemProperties.MSG_REGION | String | cn-hangzhou | The region from which the message was sent. |
| systemProperties.KEYS | String | systemProperties.KEYS | The keys used to filter the message. |
| systemProperties.CONSUME_START_TIME | Long | 1628577790396 | The consumption start time. UNIX timestamp in milliseconds. |
| systemProperties.UNIQ_KEY | String | AC14C305069E1B28CDFA3181CDA2**** | The unique key of the message. |
| systemProperties.TAGS | String | systemProperties.TAGS | The tags used to filter the message. |
| systemProperties.INSTANCE_ID | String | MQ_INST_123456789098****_BXhFHryi | The ApsaraMQ for RocketMQ instance ID. |
| userProperties | Map | The user-defined properties. | |
| body | String | TEST | The message body. |
ApsaraMQ for RabbitMQ
[
{
"id":"bj694332-4cj1-389e-9d8c-b137h30b****",
"source":"RabbitMQ-Function-rabbitmq-trigger",
"specversion":"1.0",
"type":"amqp:Queue:SendMessage",
"datacontenttype":"application/json;charset=utf-8",
"subject":"acs:amqp:cn-hangzhou:164901546557****:/instances/amqp-cn-tl32e756****/vhosts/eb-connect/queues/housekeeping",
"time":"2023-08-12T06:56:40.709Z",
"aliyunaccountid":"164901546557****",
"aliyunpublishtime":"2023-10-15T08:58:55.140Z",
"aliyunoriginalaccountid":"164901546557****",
"aliyuneventbusname":"RabbitMQ-Function-rabbitmq-trigger",
"aliyunregionid":"cn-chengdu",
"aliyunpublishaddr":"42.120.XX.XX",
"data":{
"envelope":{
"deliveryTag":98,
"exchange":"",
"redeliver":false,
"routingKey":"housekeeping"
},
"body":{
"Hello":"RabbitMQ"
},
"props":{
"contentEncoding":"UTF-8",
"messageId":"f7622d51-e198-41de-a072-77c1ead7****"
}
}
}
]data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| envelope | Map | The message envelope. | |
| envelope.deliveryTag | Int | 98 | The message delivery tag. |
| envelope.exchange | String | The exchange that sent the message. | |
| envelope.redeliver | Boolean | false | Whether the message can be redelivered. Valid values: true, false. |
| envelope.routingKey | String | housekeeping | The routing key for the message. |
| body | Map | The message body. | |
| props | Map | The message properties. | |
| props.contentEncoding | String | utf-8 | The encoding format of the message body. |
| props.messageId | String | f7622d51-e198-41de-a072-77c1ead7**** | The message ID. Each message has a unique ID. |
ApsaraMQ for Kafka
[
{
"specversion":"1.0",
"id":"8e215af8-ca18-4249-8645-f96c1026****",
"source":"acs:alikafka",
"type":"alikafka:Topic:Message",
"subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
"datacontenttype":"application/json; charset=utf-8",
"time":"2023-06-23T02:49:51.589Z",
"aliyunaccountid":"164901546557****",
"data":{
"topic":"****",
"partition":7,
"offset":25,
"timestamp":1655952591589,
"headers":{
"headers":[
],
"isReadOnly":false
},
"key":"keytest",
"value":"hello kafka msg"
}
}
]data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| topic | String | TopicName | The topic name. |
| partition | Int | 1 | The partition on the ApsaraMQ for Kafka instance. |
| offset | Int | 0 | The message offset on the ApsaraMQ for Kafka instance. |
| timestamp | String | 1655952591589 | The timestamp when message consumption started. |
Data Transmission Service (DTS)
{
"data": {
"id": 321****,
"topicPartition": {
"hash": 0,
"partition": 0,
"topic": "cn_hangzhou_rm_1234****_test_version2"
},
"offset": 3218099,
"sourceTimestamp": 1654847757,
"operationType": "UPDATE",
"schema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou--test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"beforeImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [104, 101, 108, 108, 111],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 9,
"capacity": 9,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 45
},
"afterImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [98, 121, 101],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 11,
"capacity": 11,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 47
}
},
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| id | String | 321**** | The data entry ID. |
| topicPartition | Array | The partition information for the topic to which the event is pushed. | |
| topicPartition.hash | String | 0 | The underlying storage parameter of DTS. |
| topicPartition.partition | String | 0 | The partition. |
| topicPartition.topic | String | cn_hangzhou_rm_1234****_test_version2 | The topic name. |
| offset | Int | 3218099 | The offset of the data entry. |
| sourceTimestamp | Int | 1654847757 | The UNIX timestamp when the data entry was generated. |
| operationType | String | UPDATE | The operation type performed on the data entry. |
| schema | Array | The database schema information. | |
| schema.recordFields | Array | The field details. | |
| schema.recordFields[].fieldName | String | id | The field name. |
| schema.recordFields[].rawDataTypeNum | Int | 8 | The mapped value of the field type. |
| schema.recordFields[].isPrimaryKey | Boolean | true | Whether the field is a primary key. |
| schema.recordFields[].isUniqueKey | Boolean | false | Whether the field has a unique key. |
| schema.recordFields[].fieldPosition | String | 0 | The field position. |
| schema.nameIndex | Array | The field index by field name. | |
| schema.schemaId | String | (hangzhou-test-db,hangzhou-test-db,message_info) | The database schema ID. |
| schema.databaseName | String | hangzhou--test-db | The database name. |
| schema.tableName | String | message_info | The table name. |
| schema.primaryIndexInfo | Array | The primary key index information. | |
| schema.primaryIndexInfo.indexType | String | PrimaryKey | The index type. |
| schema.primaryIndexInfo.indexFields | Array | The indexed fields. | |
| schema.primaryIndexInfo.cardinality | String | 0 | The cardinality of the primary keys. |
| schema.primaryIndexInfo.nullable | Boolean | true | Whether the primary keys can be null. |
| schema.primaryIndexInfo.isFirstUniqueIndex | Boolean | false | Whether this is the first unique index. |
| schema.uniqueIndexInfo | String | [] | The unique indexes. |
| schema.foreignIndexInfo | String | [] | The foreign key indexes. |
| schema.normalIndexInfo | String | [] | The regular indexes. |
| schema.databaseInfo | Array | The database engine information. | |
| schema.databaseInfo.databaseType | String | MySQL | The database engine type. |
| schema.databaseInfo.version | String | 5.7.35-log | The database engine version. |
| schema.totalRows | Int | 0 | The total number of rows in the table. |
| beforeImage | Object | The field values before the operation. | |
| beforeImage.values | Array | The recorded field values. | |
| beforeImage.size | Int | 45 | The size of the recorded fields. |
| afterImage | Object | The field values after the operation. | |
| afterImage.values | Array | The recorded field values. | |
| afterImage.size | Int | 47 | The size of the recorded fields. |
ApsaraMQ for MQTT
[
{
"specversion":"1.0",
"id":"AC1EC0C950650816F27D46F7D7CA****",
"source":"acs:mqtt",
"type":"mqtt:Topic:SendMessage",
"subject":"acs:mq:cn-hangzhou:143998900779****:topic/mqtt-cn-2r42qam****/housekee****",
"datacontenttype":"application/json; charset=utf-8",
"time":"2022-06-22T03:53:47.959Z",
"aliyunaccountid":"143998900779****",
"data":{
"props":{
"firstTopic":"housekee****",
"secondTopic":"/testMq4****",
"clientId":"GID_****"
},
"body":"TEST"
}
}
]data parameters
| Parameter | Type | Example | Description |
|---|---|---|---|
| props | Map | The message properties. | |
| props.firstTopic | String | housekee**** | The parent topic for sending and receiving messages. |
| props.secondTopic | String | /testMq4**** | The child topic. |
| props.clientId | String | GID_**** | The client ID. |
| body | String | TEST | The message body. |