Simple Message Queue (formerly MNS)

更新时间:
复制 MD 格式

This topic shows how to use the wait-for-callback pattern in a task step to integrate a Simple Message Queue (formerly MNS) topic and publish messages to it. Simple Message Queue (formerly MNS)

How it works

After you deploy the application, the flow runs as follows:

  1. The flow starts, and a task step publishes a message to the MNS topic. The message body contains the TaskToken from the task step.

  2. The task step pauses and waits for a callback.

  3. When a Simple Message Queue (formerly MNS) topic receives a message, it sends the message and the TaskToken to the HTTP trigger of a Function Compute (FC) function through an HTTP push, which triggers the function to execute.

  4. The Function Compute function retrieves the TaskToken and calls the ReportTaskSucceeded API to report the task's status.

  5. The flow resumes execution.

image

Deploy an application

  1. Log in to the Serverless Workflow console.

  2. On the Flows page, click Create Flow.

  3. On the Create Flow page, select the Sample Project > Task Step Orchestration for Simple Message Queue (formerly MNS) Topic template, then click Next.

  4. On the Create Application page, configure the required parameters, then click Deploy.

    • Application Name: Enter a custom name for the application. The name must be unique within your account.

    • TopicName: Enter a custom name for the topic. If the specified Simple Message Queue (formerly MNS) topic does not exist, it is automatically created.

    After you click Deploy, the page displays all resources created for the application and then redirects to the resource stack details page. On the Overview tab, you can view the creation progress of each resource in the resource list. Resource statuses include Creation in progress and Initialization complete.

  5. Run the flow.

    Run the flow with the following input:

    {
       "messageBody": "hello world"
    }

    When the execution is successful, you can view its status.

    On the execution details page, the status is displayed as Succeeded. The visual flow shows that the entire path from Start to mns-topic-task to End has run successfully.

Application code

  1. This flow orchestrates the Simple Message Queue (formerly MNS) topic.

    The TaskToken for the task step callback is encapsulated in the message's MessageBody for the subsequent callback. The outputMappings section reads the output set by ReportTaskSucceeded.

    version: v1
    type: flow
    steps:
     - type: task
     name: mns-topic-task
     resourceArn: acs:mns:::/topics/<topic>/messages
     pattern: waitForCallback
     inputMappings:
     - target: messageBody
     source: $input.messageBody
     - target: taskToken
     source: $context.task.token
     outputMappings:
     - target: status
     source: $local.status
     serviceParams:
     MessageBody: $
  2. This Function Compute function sends a callback to the task step.

    The function reads the TaskToken encapsulated in the MessageBody and reports the task status by setting the output to {"status":"success"}.

    def handler(environ, start_response):
     # Get request body
     try:
     request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
     except ValueError:
     request_body_size = 0
     request_body =
    environ['wsgi.input'].read(request_body_size)
     print('Request body:
    {}'.format(request_body))
     body = json.loads(request_body)
     message_body_str =
    body['Message']
     # Read MessageBody and TaskToken from
    message body
     message_body =
    json.loads(message_body_str)
     task_token =
    message_body['taskToken']
     ori_message_body =
    message_body['messageBody']
     print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
     # Init fnf client use sts token
     context = environ['fc.context']
     creds = context.credentials
     sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
     fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
     # Report task succeeded to serverless
    workflow
     req =
    ReportTaskSucceededRequest()
     req.set_TaskToken(task_token)
     req.set_Output('{"status":
    "success"}')
     resp =
    fnf_client.do_action_with_exception(req)
     print('Report task response:
    {}'.format(resp))
     # Response to http request
     status = '200 OK'
     response_headers = [('Content-type',
    'text/plain')]
     start_response(status,
    response_headers)
     return [b'OK']

More information

For more details about orchestrating a Simple Message Queue (formerly MNS) topic with a task step, see task-mns-topics application code.