Workflow scheduling

更新时间:
复制 MD 格式

A workflow can be triggered through the CloudFlow console, by invoking an SDK, or by an event source. Workflow scheduling automates workflow execution by connecting event sources to workflows. It provides a centralized and unified way to manage different event sources. Define a set of trigger rules so that when an incoming event matches those rules, the event source automatically starts the associated workflow -- no manual invocation required.

Use cases

Automated image processing -- When a new image is uploaded to an Object Storage Service (OSS) bucket, a trigger starts a workflow that downloads the image for processing and stores the result in OSS or another downstream service.

Incremental log analysis -- When logs are updated in Simple Log Service (SLS), a trigger starts a workflow that queries and analyzes the new log entries.

Periodic data collection -- A time-based schedule runs a workflow at a fixed interval, such as once per hour, to collect and process data without manual intervention.

Trigger categories

CloudFlow organizes triggers into two categories based on how the event source integrates with the workflow.

Two-way integration triggers

You configure scheduling in both the workflow and the event source. The following trigger types use two-way integration:

Trigger typeStandard modeExpress mode
Time-based schedulingAsynchronous invocationSynchronous invocation
Simple Log ServiceAsynchronous invocationSynchronous invocation
MNSAsynchronous invocationSynchronous invocation
ApsaraMQ for KafkaAsynchronous invocationSynchronous invocation
ApsaraMQ for RocketMQAsynchronous invocationSynchronous invocation
ApsaraMQ for RabbitMQAsynchronous invocationSynchronous invocation
HTTPAsynchronous invocationSynchronous invocation

Cloud service event triggers

You configure scheduling in the workflow and create trigger rules in EventBridge. No configuration is required on the event source side.

Trigger typeStandard modeExpress mode
Alibaba Cloud service eventAsynchronous invocationSynchronous invocation

Synchronous and asynchronous invocation

The invocation method determines when you receive a response:

Invocation methodBehavior
SynchronousReturns the execution result after the workflow finishes processing the event.
AsynchronousReturns a response immediately after the event is written to the workflow's internal queue. The workflow system ensures that the messages can be processed.

Event payload reference

Each trigger type passes a CloudEvents-compliant JSON payload to the workflow. To extract the Event parameter from the payload using an expression, see Inputs and outputs.

The following sections document the data field structure for each trigger type. Parameters outside the data field follow the CloudEvents specification.

Time-based scheduling

{
    "datacontenttype":"application/json;charset=utf-8",
    "aliyunaccountid":"143998900779****",
    "aliyunpublishtime":"2022-09-21T05:00:00.035Z",
    "data":{
        "TimeZone":"GMT+0:00",
        "Schedule":"0/30 * * * * ?",
        "UserData":{
            "key":"value"
        }
    },
    "specversion":"1.0",
    "aliyuneventbusname":"Housekeeping-Bus",
    "id":"d100262d-90c7-4caf-a3b5-813f3526a1f7-****",
    "source":"housekeeping.scheduledevent",
    "time":"2022-09-21T05:00:00Z",
    "aliyunregionid":"cn-beijing",
    "type":"eventbridge:Events:ScheduledEvent"
}

data parameters

ParameterTypeExampleDescription
TimeZoneStringGMT+8:00The time zone.
ScheduleString0 */10 * * * *The cron expression. Used when Trigger Period is set to Fixed Period.
UserDataObject{"key":"value"}Custom parameters in JSON format.

HTTP

{
    "datacontenttype": "application/json",
    "aliyunaccountid": "164901546557****",
    "data": {
        "headers": {
            "content-length": "0",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
            "Host": "164901546557****.eventbridge.cn-hangzhou.aliyuncs.com",
            "Accept-Encoding": "gzip, deflate",
            "X-Forwarded-Port": "80",
            "Upgrade-Insecure-Requests": "1",
            "X-Forwarded-For": "183.247.0.***",
            "Accept-Language": "zh-CN,zh;q=0.9",
            "X-Real-IP": "183.247.0.***",
            "X-Scheme": "http"
        },
        "path": "/webhook/putEvents",
        "body": "",
        "httpMethod": "GET",
        "queryString": {}
    },
    "subject": "acs:eventbridge:cn-hangzhou:164901546557****:eventbus/eventbus-created-by-fnf-466ccc7e-418a-403f-8d96-2d73a8e****/eventsource/httpschedule",
    "aliyunoriginalaccountid": "164901546557****",
    "source": "httpschedule",
    "type": "eventbridge:Events:HTTPEvent",
    "aliyunpublishtime": "2023-08-06T18:37:01.666Z",
    "specversion": "1.0",
    "aliyuneventbusname": "eventbus-created-by-fnf-466ccc7e-418a-403f-8d96-2d73a8e****",
    "id": "6751261d-e496-4b36-a707-3c087bf3****",
    "time": "2023-08-07T02:37:01.666+08:00",
    "aliyunregionid": "cn-hangzhou",
    "aliyunpublishaddr": "183.247.0.***"
}

data parameters

ParameterTypeExampleDescription
headersObject{"Accept": "application/json"}The HTTP request headers.
pathString/webhook/putEventsThe request path.
bodyObject{"filePath": "/tmp/uploader"}The HTTP request body.
httpMethodStringGETThe HTTP method.
queryStringStringusername=leoThe query string from the HTTP URL. Excludes the token used to request the webhook.

Alibaba Cloud service event

{
    "id":"c2g71017-6f65-fhcf-a814-a396fc8d****",
    "source":"OSS-FunctionFlow-osstrigger",
    "specversion":"1.0",
    "type":"oss:PutObject",
    "datacontenttype":"application/json; charset=utf-8",
    "subject":"acs:mns:cn-hangzhou:164901546557****:queues/zeus",
    "time":"2021-04-08T06:28:17.093Z",
    "aliyunaccountid":"1649015465574023",
    "aliyunpublishtime":"2021-10-15T07:06:34.028Z",
    "aliyunoriginalaccountid":"164901546557****",
    "aliyuneventbusname":"OSS-FunctionFlow-osstrigger",
    "aliyunregionid":"cn-chengdu",
    "aliyunpublishaddr":"42.120.XX.XX",
    "data":{
      "*** The content varies based on the event source. ***"
    }
}

The data content varies by Alibaba Cloud service. For event types and formats of each service, see Alibaba Cloud service event sources.

Simple Log Service

[
    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "164901546557****",
        "data": {
            "key1": "value1",
            "key2": "value2",
            "__topic__": "test_topic",
            "__source__": "test_source",
            "__client_ip__": "122.231.XX.XX",
            "__receive_time__": "1663487595",
            "__pack_id__": "59b662b2257796****"
        },
        "subject": "acs:log:cn-qingdao:164901546557****:project/qiingdaoproject/logstore/qingdao-logstore-1",
        "aliyunoriginalaccountid": "164901546557****",
        "source": "SLS-FunctionFlow-slstrigger",
        "type": "sls:connector",
        "aliyunpublishtime": "2022-09-18T07:53:15.387Z",
        "specversion": "1.0",
        "aliyuneventbusname": "SLS-FunctionFlow-slstrigger",
        "id": "qiingdaoproject-qingdao-logstore-1-1-MTY2MzExODM5ODY4NjAxOTQyMw****",
        "time": "2022-09-18T07:53:12Z",
        "aliyunregionid": "cn-qingdao",
        "aliyunpublishaddr": "10.50.XX.XX"
    }
]

data parameters

Fields with __ prefixes and suffixes are Simple Log Service system fields. For details, see Reserved fields.

ParameterTypeExampleDescription
key1StringtestKeyA user-defined field in the log entry.
__topic__StringtestTopicThe log topic.
__source__StringtestSourceThe device from which the log is collected.
__client_ip__String122.231.XX.XXThe IP address of the host where the log resides.
__receive_time__String1663487595The time when the log arrived at the server. UNIX timestamp.
__pack_id__String59b662b2257796****The unique ID of the log group to which the log belongs.

MNS

[
    {
        "id":"c2g71017-6f65-fhcf-a814-a396fc8d****",
        "source":"MNS-FunctionFlow-mnstrigger",
        "specversion":"1.0",
        "type":"mns:Queue:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mns:cn-hangzhou:164901546557****:queues/zeus",
        "time":"2023-04-08T06:28:17.093Z",
        "aliyunaccountid":"1649015465574023",
        "aliyunpublishtime":"2023-10-15T07:06:34.028Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"MNS-Function-mnstrigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "requestId":"606EA3074344430D4C81****",
            "messageId":"C6DB60D1574661357FA227277445****",
            "messageBody":"TEST"
        }
    }
]

data parameters

ParameterTypeExampleDescription
requestIdString606EA3074344430D4C81****The request ID. Each request has a unique ID.
messageIdStringC6DB60D1574661357FA227277445****The message ID. Each message has a unique ID.
messageBodyStringTESTThe message body.

ApsaraMQ for RocketMQ

[
    {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-rocketmq-schedule",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2023-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2023-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{

            },
            "body":"TEST"
        }
    }
]

data parameters

ParameterTypeExampleDescription
topicStringTopicNameThe topic name.
systemPropertiesMapThe system properties.
systemProperties.MIN_OFFSETInt0The earliest offset.
systemProperties.TRACE_ONBooleantrueWhether to include a message trace. Valid values: true, false.
systemProperties.MAX_OFFSETInt8The latest offset.
systemProperties.MSG_REGIONStringcn-hangzhouThe region from which the message was sent.
systemProperties.KEYSStringsystemProperties.KEYSThe keys used to filter the message.
systemProperties.CONSUME_START_TIMELong1628577790396The consumption start time. UNIX timestamp in milliseconds.
systemProperties.UNIQ_KEYStringAC14C305069E1B28CDFA3181CDA2****The unique key of the message.
systemProperties.TAGSStringsystemProperties.TAGSThe tags used to filter the message.
systemProperties.INSTANCE_IDStringMQ_INST_123456789098****_BXhFHryiThe ApsaraMQ for RocketMQ instance ID.
userPropertiesMapThe user-defined properties.
bodyStringTESTThe message body.

ApsaraMQ for RabbitMQ

[
    {
        "id":"bj694332-4cj1-389e-9d8c-b137h30b****",
        "source":"RabbitMQ-Function-rabbitmq-trigger",
        "specversion":"1.0",
        "type":"amqp:Queue:SendMessage",
        "datacontenttype":"application/json;charset=utf-8",
        "subject":"acs:amqp:cn-hangzhou:164901546557****:/instances/amqp-cn-tl32e756****/vhosts/eb-connect/queues/housekeeping",
        "time":"2023-08-12T06:56:40.709Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2023-10-15T08:58:55.140Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RabbitMQ-Function-rabbitmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "envelope":{
                "deliveryTag":98,
                "exchange":"",
                "redeliver":false,
                "routingKey":"housekeeping"
            },
            "body":{
                "Hello":"RabbitMQ"
            },
            "props":{
                "contentEncoding":"UTF-8",
                "messageId":"f7622d51-e198-41de-a072-77c1ead7****"
            }
        }
    }
]

data parameters

ParameterTypeExampleDescription
envelopeMapThe message envelope.
envelope.deliveryTagInt98The message delivery tag.
envelope.exchangeStringThe exchange that sent the message.
envelope.redeliverBooleanfalseWhether the message can be redelivered. Valid values: true, false.
envelope.routingKeyStringhousekeepingThe routing key for the message.
bodyMapThe message body.
propsMapThe message properties.
props.contentEncodingStringutf-8The encoding format of the message body.
props.messageIdStringf7622d51-e198-41de-a072-77c1ead7****The message ID. Each message has a unique ID.

ApsaraMQ for Kafka

[
    {
        "specversion":"1.0",
        "id":"8e215af8-ca18-4249-8645-f96c1026****",
        "source":"acs:alikafka",
        "type":"alikafka:Topic:Message",
        "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
        "datacontenttype":"application/json; charset=utf-8",
        "time":"2023-06-23T02:49:51.589Z",
        "aliyunaccountid":"164901546557****",
        "data":{
            "topic":"****",
            "partition":7,
            "offset":25,
            "timestamp":1655952591589,
            "headers":{
                "headers":[

                ],
                "isReadOnly":false
            },
            "key":"keytest",
            "value":"hello kafka msg"
        }
    }
]

data parameters

ParameterTypeExampleDescription
topicStringTopicNameThe topic name.
partitionInt1The partition on the ApsaraMQ for Kafka instance.
offsetInt0The message offset on the ApsaraMQ for Kafka instance.
timestampString1655952591589The timestamp when message consumption started.

Data Transmission Service (DTS)

{
    "data": {
        "id": 321****,
        "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
        },
        "offset": 3218099,
        "sourceTimestamp": 1654847757,
        "operationType": "UPDATE",
        "schema": {
            "recordFields": [
                {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                },
                {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                }
            ],
            "nameIndex": {
                "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                },
                "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                    {
                        "fieldName": "id",
                        "rawDataTypeNum": 8,
                        "isPrimaryKey": true,
                        "isUniqueKey": false,
                        "fieldPosition": 0
                    }
                ],
                "cardinality": 0,
                "nullable": true,
                "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
                "databaseType": "MySQL",
                "version": "5.7.35-log"
            },
            "totalRows": 0
        },
        "beforeImage": {
            "recordSchema": {
                "recordFields": [
                    {
                        "fieldName": "id",
                        "rawDataTypeNum": 8,
                        "isPrimaryKey": true,
                        "isUniqueKey": false,
                        "fieldPosition": 0
                    },
                    {
                        "fieldName": "topic",
                        "rawDataTypeNum": 253,
                        "isPrimaryKey": false,
                        "isUniqueKey": false,
                        "fieldPosition": 1
                    }
                ],
                "nameIndex": {
                    "id": {
                        "fieldName": "id",
                        "rawDataTypeNum": 8,
                        "isPrimaryKey": true,
                        "isUniqueKey": false,
                        "fieldPosition": 0
                    },
                    "topic": {
                        "fieldName": "topic",
                        "rawDataTypeNum": 253,
                        "isPrimaryKey": false,
                        "isUniqueKey": false,
                        "fieldPosition": 1
                    }
                },
                "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                "databaseName": "hangzhou-test-db",
                "tableName": "message_info",
                "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                        {
                            "fieldName": "id",
                            "rawDataTypeNum": 8,
                            "isPrimaryKey": true,
                            "isUniqueKey": false,
                            "fieldPosition": 0
                        }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                },
                "uniqueIndexInfo": [],
                "foreignIndexInfo": [],
                "normalIndexInfo": [],
                "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                },
                "totalRows": 0
            },
            "values": [
                {
                    "data": 115
                },
                {
                    "data": {
                        "hb": [104, 101, 108, 108, 111],
                        "offset": 0,
                        "isReadOnly": false,
                        "bigEndian": true,
                        "nativeByteOrder": false,
                        "mark": -1,
                        "position": 0,
                        "limit": 9,
                        "capacity": 9,
                        "address": 0
                    },
                    "charset": "utf8mb4"
                }
            ],
            "size": 45
        },
        "afterImage": {
            "recordSchema": {
                "recordFields": [
                    {
                        "fieldName": "id",
                        "rawDataTypeNum": 8,
                        "isPrimaryKey": true,
                        "isUniqueKey": false,
                        "fieldPosition": 0
                    },
                    {
                        "fieldName": "topic",
                        "rawDataTypeNum": 253,
                        "isPrimaryKey": false,
                        "isUniqueKey": false,
                        "fieldPosition": 1
                    }
                ],
                "nameIndex": {
                    "id": {
                        "fieldName": "id",
                        "rawDataTypeNum": 8,
                        "isPrimaryKey": true,
                        "isUniqueKey": false,
                        "fieldPosition": 0
                    },
                    "topic": {
                        "fieldName": "topic",
                        "rawDataTypeNum": 253,
                        "isPrimaryKey": false,
                        "isUniqueKey": false,
                        "fieldPosition": 1
                    }
                },
                "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                "databaseName": "hangzhou-test-db",
                "tableName": "message_info",
                "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                        {
                            "fieldName": "id",
                            "rawDataTypeNum": 8,
                            "isPrimaryKey": true,
                            "isUniqueKey": false,
                            "fieldPosition": 0
                        }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                },
                "uniqueIndexInfo": [],
                "foreignIndexInfo": [],
                "normalIndexInfo": [],
                "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                },
                "totalRows": 0
            },
            "values": [
                {
                    "data": 115
                },
                {
                    "data": {
                        "hb": [98, 121, 101],
                        "offset": 0,
                        "isReadOnly": false,
                        "bigEndian": true,
                        "nativeByteOrder": false,
                        "mark": -1,
                        "position": 0,
                        "limit": 11,
                        "capacity": 11,
                        "address": 0
                    },
                    "charset": "utf8mb4"
                }
            ],
            "size": 47
        }
    },
    "id": "12f701a43741d404fa9a7be89d9acae0-321****",
    "source": "DTSstreamDemo",
    "specversion": "1.0",
    "type": "dts:ConsumeMessage",
    "datacontenttype": "application/json; charset=utf-8",
    "time": "2022-06-10T07:55:57Z",
    "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}

data parameters

ParameterTypeExampleDescription
idString321****The data entry ID.
topicPartitionArrayThe partition information for the topic to which the event is pushed.
topicPartition.hashString0The underlying storage parameter of DTS.
topicPartition.partitionString0The partition.
topicPartition.topicStringcn_hangzhou_rm_1234****_test_version2The topic name.
offsetInt3218099The offset of the data entry.
sourceTimestampInt1654847757The UNIX timestamp when the data entry was generated.
operationTypeStringUPDATEThe operation type performed on the data entry.
schemaArrayThe database schema information.
schema.recordFieldsArrayThe field details.
schema.recordFields[].fieldNameStringidThe field name.
schema.recordFields[].rawDataTypeNumInt8The mapped value of the field type.
schema.recordFields[].isPrimaryKeyBooleantrueWhether the field is a primary key.
schema.recordFields[].isUniqueKeyBooleanfalseWhether the field has a unique key.
schema.recordFields[].fieldPositionString0The field position.
schema.nameIndexArrayThe field index by field name.
schema.schemaIdString(hangzhou-test-db,hangzhou-test-db,message_info)The database schema ID.
schema.databaseNameStringhangzhou--test-dbThe database name.
schema.tableNameStringmessage_infoThe table name.
schema.primaryIndexInfoArrayThe primary key index information.
schema.primaryIndexInfo.indexTypeStringPrimaryKeyThe index type.
schema.primaryIndexInfo.indexFieldsArrayThe indexed fields.
schema.primaryIndexInfo.cardinalityString0The cardinality of the primary keys.
schema.primaryIndexInfo.nullableBooleantrueWhether the primary keys can be null.
schema.primaryIndexInfo.isFirstUniqueIndexBooleanfalseWhether this is the first unique index.
schema.uniqueIndexInfoString[]The unique indexes.
schema.foreignIndexInfoString[]The foreign key indexes.
schema.normalIndexInfoString[]The regular indexes.
schema.databaseInfoArrayThe database engine information.
schema.databaseInfo.databaseTypeStringMySQLThe database engine type.
schema.databaseInfo.versionString5.7.35-logThe database engine version.
schema.totalRowsInt0The total number of rows in the table.
beforeImageObjectThe field values before the operation.
beforeImage.valuesArrayThe recorded field values.
beforeImage.sizeInt45The size of the recorded fields.
afterImageObjectThe field values after the operation.
afterImage.valuesArrayThe recorded field values.
afterImage.sizeInt47The size of the recorded fields.

ApsaraMQ for MQTT

[
    {
        "specversion":"1.0",
        "id":"AC1EC0C950650816F27D46F7D7CA****",
        "source":"acs:mqtt",
        "type":"mqtt:Topic:SendMessage",
        "subject":"acs:mq:cn-hangzhou:143998900779****:topic/mqtt-cn-2r42qam****/housekee****",
        "datacontenttype":"application/json; charset=utf-8",
        "time":"2022-06-22T03:53:47.959Z",
        "aliyunaccountid":"143998900779****",
        "data":{
            "props":{
                "firstTopic":"housekee****",
                "secondTopic":"/testMq4****",
                "clientId":"GID_****"
            },
            "body":"TEST"
        }
    }
]

data parameters

ParameterTypeExampleDescription
propsMapThe message properties.
props.firstTopicStringhousekee****The parent topic for sending and receiving messages.
props.secondTopicString/testMq4****The child topic.
props.clientIdStringGID_****The client ID.
bodyStringTESTThe message body.