Orchestrate tasks with Simple Message Queue (formerly MNS) integration and callbacks

更新时间:
复制 MD 格式

The Serverless Workflow Simple Message Queue (formerly MNS) feature simplifies interaction with cloud services. Use the Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) integration, combined with the Simple Message Queue (formerly MNS) pattern, to orchestrate a wide range of computing tasks that run outside of Simple Message Queue (formerly MNS).

Background

The use cases for Serverless Workflow are not limited to orchestrating Function Compute (FC) FaaS functions, but also include any general computing task. The previous best practice document Asynchronous Task Callback describes a method where a Function Compute function sends a message to a Simple Message Queue (formerly MNS) queue. A worker in a custom environment receives the message and uses a callback to notify Serverless Workflow of the task execution result. You can use a feature of Serverless Workflow that integrates with Simple Message Queue (formerly MNS) queues. This integration with Simple Message Queue (formerly MNS) queues further simplifies the orchestration of custom task types. Serverless Workflow can directly send messages, which eliminates the need to develop, test, and maintain a Function Compute function for sending messages. This improves availability and reduces latency. Using the Simple Message Queue (formerly MNS) integration provides benefits over using a Function Compute function to send messages to Simple Message Queue (formerly MNS). The Simple Message Queue (formerly MNS) queue integration further simplifies the orchestration of custom task types by using Simple Message Queue (formerly MNS) queues.

  • Eliminates the need to develop a Function Compute function for sending messages, which reduces development, testing, and maintenance costs.

  • Reduces message delivery latency, eliminates an extra API call, and avoids Function Compute cold starts.

  • Removes a service dependency and improves fault tolerance.

Serverless Workflow will support more cloud service integrations in the future to simplify the orchestration of flows that use different task types.

How it works

In the following diagram, Serverless Workflow sends three sequential tasks as messages to a user-specified Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS). After successfully sending a message, the flow execution pauses at that step and waits for a callback. A worker, which can be an ECS instance, a container, or a machine in an on-premises data center, polls the queue for messages by calling the Simple Message Queue (formerly MNS) ReceiveMessage API. After receiving a message, the Simple Message Queue (formerly MNS) processes the corresponding Simple Message Queue (formerly MNS) based on the message content. When the Simple Message Queue (formerly MNS) is complete, the Simple Message Queue (formerly MNS) calls the Serverless Workflow ReportTaskSucceeded/Failed API to report the result. Upon receiving the result, Serverless Workflow resumes the flow from the paused step. After successfully reporting the task result, the worker deletes the message from the Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS).

fnf-docs-service-integration

Procedure

Follow these steps:

  1. Prerequisites

  2. Define the flow

  3. Write the worker

  4. Run the flow and view the result

Step 1: Prerequisites

  1. Create a Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) in the Simple Message Queue (formerly MNS) console. For detailed instructions, see Create a queue.

  2. Serverless Workflow assumes the Execution Role (a RAM role) that you specify in the flow to send messages to the Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) in your account. Therefore, you must grant the necessary Simple Message Queue (formerly MNS) message-sending Simple Message Queue (formerly MNS) to this RAM role. The following code shows an example of a fine-grained policy. If you do not require fine-grained permissions, you can go to the Serverless Workflow console and attach the System Policy AliyunMNSFullAccess to the flow RAM role.

{
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "mns:SendMessage"
            ],
            "Resource": [
                "acs:mns:$region:$account_id:/queues/$queue_name/messages"
            ]
        }
    ],
    "Version": "1"
}         

Step 2: Define the flow

The following FDL (Flow Definition Language) example defines a task step that sends a message to the fnf-demo Simple Message Queue (formerly MNS) Simple Message Queue (formerly MNS) and then waits for a Simple Message Queue (formerly MNS).

version: v1
type: flow
steps:
 - type: task
   name: Task_1
   resourceArn: acs:mns:::/queues/fnf-demo/messages  # This task step sends a message to the fnf-demo Simple Message Queue (formerly MNS) queue in the same region and within the same account.
   pattern: waitForCallback  # The task step pauses after the message is sent and waits for a callback.
   inputMappings:
      - target: task_token
        source: $context.task.token  # Get the unique task token from the context object.
      - target: key
        source: value
   serviceParams:  # Service integration parameters.
     MessageBody: $  # Use the mapped input as the message body.
     Priority: 1  # The message priority in the queue.

Step 3: Write the worker

The following Python 2.7 code simulates a worker that executes tasks. It can run in any environment with access to Serverless Workflow and Simple Message Queue (formerly MNS). The Simple Message Queue (formerly MNS) uses Simple Message Queue (formerly MNS) to call the Simple Message Queue (formerly MNS) ReceiveMessage API. When a Simple Message Queue (formerly MNS) step with the Simple Message Queue (formerly MNS) configuration is executed, Serverless Workflow sends a message to the fnf-demo queue. After processing the task, the worker calls the Serverless Workflow ReportTaskSucceeded/Failed API to report the result. After Serverless Workflow receives the result, it resumes the flow, and the worker can then delete the message.

  1. In a virtual environment, install the Python SDKs for Serverless Workflow and Simple Message Queue (formerly MNS).

    cd /tmp; mkdir -p fnf-demo-callback; cd fnf-demo-callback
    virtualenv env; source env/bin/activate
    pip install -t . aliyun-python-sdk-core -t . aliyun-python-sdk-fnf -t . aliyun-mns                              
  2. Write the code for the local task worker and save it as worker.py.

    import json
    import os
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
    from mns.account import Account # pip install aliyun-mns
    from mns.queue import *
    def main():
      region = os.environ['REGION']
      account_id = os.environ['ACCOUNT_ID']
      ak_id = os.environ['AK_ID']
      ak_secret = os.environ['AK_SECRET']
      queue_name = "fnf-demo"
      fnf_client = AcsClient(
        ak_id,
        ak_secret,
        region
      )
      mns_endpoint = "https://%s.mns.%s.aliyuncs.com" % (account_id, region)
      my_account = Account(mns_endpoint, ak_id, ak_secret)
      my_queue = my_account.get_queue("fnf-demo")
      my_queue.set_encoding(False)
      wait_seconds = 10
      try:
        while True:
          try:
            print "Receiving messages"
            recv_msg = my_queue.receive_message(wait_seconds)
            print "Received message %s, body %s" % (recv_msg.message_id, recv_msg.message_body)
            body = json.loads(recv_msg.message_body)
            task_token = body["task_token"]
            output = "{\"key\": \"value\"}"
            request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
            request.set_Output(output)
            request.set_TaskToken(task_token)
            resp = fnf_client.do_action_with_exception(request)
            print "Report task succeeded finished"
            my_queue.delete_message(recv_msg.receipt_handle)
            print "Deleted message " + recv_msg.message_id
          except MNSExceptionBase as e:
            print(e)
          except ServerException as e:
            print(e)
            if e.error_code == 'TaskAlreadyCompleted':
              my_queue.delete_message(recv_msg.receipt_handle)
              print "Task already completed, deleted message " + recv_msg.message_id
      except ServerException as e:
        print(e)
    if __name__ == '__main__':
        main()                               
  3. Start the worker. It uses long polling to monitor the fnf-demo queue. When it receives a message, it calls the Serverless Workflow API to report the task result.

    # Run the worker process.
    export REGION={your-region}
    export ACCOUNT_ID={your-account-id}
    export AK_ID={your-ak-id}
    export AK_SECRET={your-ak-secret}
    python worker.py           

Step 4: Run the flow and view the result

Start an execution of the flow in the Serverless Workflow console. With the worker running, the flow executes successfully.

The execution details page displays basic information such as the execution name, creation time, end time, and status. The Flow Definition and Visual Workflow panel shows the execution path, such as StartTask_1End. The Step Information panel on the right displays the JSON input and output data for each step, including fields such as input and local.