The Serverless Workflow Simple Message Queue (formerly MNS) feature simplifies interaction with cloud services. Use the Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) integration, combined with the Simple Message Queue (formerly MNS) pattern, to orchestrate a wide range of computing tasks that run outside of Simple Message Queue (formerly MNS).
Background
The use cases for Serverless Workflow are not limited to orchestrating Function Compute (FC) FaaS functions, but also include any general computing task. The previous best practice document Asynchronous Task Callback describes a method where a Function Compute function sends a message to a Simple Message Queue (formerly MNS) queue. A worker in a custom environment receives the message and uses a callback to notify Serverless Workflow of the task execution result. You can use a feature of Serverless Workflow that integrates with Simple Message Queue (formerly MNS) queues. This integration with Simple Message Queue (formerly MNS) queues further simplifies the orchestration of custom task types. Serverless Workflow can directly send messages, which eliminates the need to develop, test, and maintain a Function Compute function for sending messages. This improves availability and reduces latency. Using the Simple Message Queue (formerly MNS) integration provides benefits over using a Function Compute function to send messages to Simple Message Queue (formerly MNS). The Simple Message Queue (formerly MNS) queue integration further simplifies the orchestration of custom task types by using Simple Message Queue (formerly MNS) queues.
-
Eliminates the need to develop a Function Compute function for sending messages, which reduces development, testing, and maintenance costs.
-
Reduces message delivery latency, eliminates an extra API call, and avoids Function Compute cold starts.
-
Removes a service dependency and improves fault tolerance.
Serverless Workflow will support more cloud service integrations in the future to simplify the orchestration of flows that use different task types.
How it works
In the following diagram, Serverless Workflow sends three sequential tasks as messages to a user-specified Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS). After successfully sending a message, the flow execution pauses at that step and waits for a callback. A worker, which can be an ECS instance, a container, or a machine in an on-premises data center, polls the queue for messages by calling the Simple Message Queue (formerly MNS) ReceiveMessage API. After receiving a message, the Simple Message Queue (formerly MNS) processes the corresponding Simple Message Queue (formerly MNS) based on the message content. When the Simple Message Queue (formerly MNS) is complete, the Simple Message Queue (formerly MNS) calls the Serverless Workflow ReportTaskSucceeded/Failed API to report the result. Upon receiving the result, Serverless Workflow resumes the flow from the paused step. After successfully reporting the task result, the worker deletes the message from the Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS).

Procedure
Follow these steps:
Step 1: Prerequisites
-
Create a Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) in the Simple Message Queue (formerly MNS) console. For detailed instructions, see Create a queue.
-
Serverless Workflow assumes the Execution Role (a RAM role) that you specify in the flow to send messages to the Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) in your account. Therefore, you must grant the necessary Simple Message Queue (formerly MNS) message-sending Simple Message Queue (formerly MNS) to this RAM role. The following code shows an example of a fine-grained policy. If you do not require fine-grained permissions, you can go to the Serverless Workflow console and attach the System Policy
AliyunMNSFullAccessto the flow RAM role.
{
"Statement": [
{
"Effect": "Allow",
"Action": [
"mns:SendMessage"
],
"Resource": [
"acs:mns:$region:$account_id:/queues/$queue_name/messages"
]
}
],
"Version": "1"
}
Step 2: Define the flow
The following FDL (Flow Definition Language) example defines a task step that sends a message to the fnf-demo Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) and then waits for a Simple Message Queue (formerly MNS).
version: v1
type: flow
steps:
- type: task
name: Task_1
resourceArn: acs:mns:::/queues/fnf-demo/messages # This task step sends a message to the fnf-demo Simple Message Queue (formerly MNS) queue in the same region and within the same account.
pattern: waitForCallback # The task step pauses after the message is sent and waits for a callback.
inputMappings:
- target: task_token
source: $context.task.token # Get the unique task token from the context object.
- target: key
source: value
serviceParams: # Service integration parameters.
MessageBody: $ # Use the mapped input as the message body.
Priority: 1 # The message priority in the queue.
Step 3: Write the worker
The following Python 2.7 code simulates a worker that executes tasks. It can run in any environment with access to Serverless Workflow and Simple Message Queue (formerly MNS). The Simple Message Queue (formerly MNS) uses Simple Message Queue (formerly MNS) to call the Simple Message Queue (formerly MNS) ReceiveMessage API. When a Simple Message Queue (formerly MNS) step with the Simple Message Queue (formerly MNS) configuration is executed, Serverless Workflow sends a message to the fnf-demo queue. After processing the task, the worker calls the Serverless Workflow ReportTaskSucceeded/Failed API to report the result. After Serverless Workflow receives the result, it resumes the flow, and the worker can then delete the message.
-
In a virtual environment, install the Python SDKs for Serverless Workflow and Simple Message Queue (formerly MNS).
cd /tmp; mkdir -p fnf-demo-callback; cd fnf-demo-callback virtualenv env; source env/bin/activate pip install -t . aliyun-python-sdk-core -t . aliyun-python-sdk-fnf -t . aliyun-mns -
Write the code for the local task worker and save it as
worker.py.import json import os from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest from mns.account import Account # pip install aliyun-mns from mns.queue import * def main(): region = os.environ['REGION'] account_id = os.environ['ACCOUNT_ID'] ak_id = os.environ['AK_ID'] ak_secret = os.environ['AK_SECRET'] queue_name = "fnf-demo" fnf_client = AcsClient( ak_id, ak_secret, region ) mns_endpoint = "https://%s.mns.%s.aliyuncs.com" % (account_id, region) my_account = Account(mns_endpoint, ak_id, ak_secret) my_queue = my_account.get_queue("fnf-demo") my_queue.set_encoding(False) wait_seconds = 10 try: while True: try: print "Receiving messages" recv_msg = my_queue.receive_message(wait_seconds) print "Received message %s, body %s" % (recv_msg.message_id, recv_msg.message_body) body = json.loads(recv_msg.message_body) task_token = body["task_token"] output = "{\"key\": \"value\"}" request = ReportTaskSucceededRequest.ReportTaskSucceededRequest() request.set_Output(output) request.set_TaskToken(task_token) resp = fnf_client.do_action_with_exception(request) print "Report task succeeded finished" my_queue.delete_message(recv_msg.receipt_handle) print "Deleted message " + recv_msg.message_id except MNSExceptionBase as e: print(e) except ServerException as e: print(e) if e.error_code == 'TaskAlreadyCompleted': my_queue.delete_message(recv_msg.receipt_handle) print "Task already completed, deleted message " + recv_msg.message_id except ServerException as e: print(e) if __name__ == '__main__': main() -
Start the worker. It uses long polling to monitor the
fnf-demoqueue. When it receives a message, it calls the Serverless Workflow API to report the task result.# Run the worker process. export REGION={your-region} export ACCOUNT_ID={your-account-id} export AK_ID={your-ak-id} export AK_SECRET={your-ak-secret} python worker.py
Step 4: Run the flow and view the result
Start an execution of the flow in the Serverless Workflow console. With the worker running, the flow executes successfully.
The execution details page displays basic information such as the execution name, creation time, end time, and status. The Flow Definition and Visual Workflow panel shows the execution path, such as Start → Task_1 → End. The Step Information panel on the right displays the JSON input and output data for each step, including fields such as input and local.