本文介绍了如何使用任务步骤的等待回调(waitForCallback)模式集成 MNS 主题,并发布消息到主题。MNS 主题接收到消息后,调用工作流 ReportTaskSucceeded 或 ReportTaskFailed API 回调任务状态。

框架原理

应用部署后执行流程如下:

  1. 执行工作流,任务步骤发布消息到 MNS 主题。任务步骤的 TaskToken 放入消息体一起发送到主题。
  2. 工作流任务步骤暂停执行,等待任务回调。
  3. MNS 主题接收到消息后,将消息和 TaskToken 通过 HTTP 推送发送到函数计算 FC 的函数 HTTP 触发器,触发函数执行。
  4. 函数计算函数最终获取到 TaskToken,并调用 ReportTaskSucceeded 来报告任务状态。
  5. 流程继续执行。
11

部署应用

  1. Serverless 工作流控制台创建流程,选择示例项目任务步骤编排 MNS 主题模板,单击下一步
    2
  2. 创建应用页面,创建模板对应的应用,并单击部署
    3

    其中:

    应用名称:自定义参数,同一账号下必须唯一。

    TopicName:自定义参数,如果对应 MNS 主题不存在会自动创建。

    单击部署后,会显示应用下创建的所有资源。4
  3. 执行工作流。

    执行输入

    {
    "messageBody": "hello world"
    }
    5

应用代码

  1. 编排 MNS 主题的工作流。

    将任务步骤回调的 TaskToken 封装在消息的 MessageBody 中,用于后续的回调。outputMappings 中读取 ReportTaskSucceeded 设置的 output

    version: v1
    type: flow
    steps:
      - type: task
        name: mns-topic-task
        resourceArn: acs:mns:::/topics/<topic>/messages
        pattern: waitForCallback
        inputMappings:
          - target: messageBody
            source: $input.messageBody
          - target: taskToken
            source: $context.task.token
        outputMappings:
          - target: status
            source: $local.status
        serviceParams:
          MessageBody: $
  2. 回调任务步骤的 FC 函数。

    读取 MessageBody中封装的 TaskToken 回调任务状态设置 output{"status":"success"}

    def handler(environ, start_response):
        # Get request body
        try:
            request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
        except ValueError:
            request_body_size = 0
        request_body =
    environ['wsgi.input'].read(request_body_size)
        print('Request body:
    {}'.format(request_body))
    
        body = json.loads(request_body)
        message_body_str =
    body['Message']
    
        # Read MessageBody and TaskToken from
    message body
        message_body =
    json.loads(message_body_str)
        task_token =
    message_body['taskToken']
        ori_message_body =
    message_body['messageBody']
        print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
    
        # Init fnf client use sts token
        context = environ['fc.context']
        creds = context.credentials
        sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
        fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
    
        # Report task succeeded to serverless
    workflow
        req =
    ReportTaskSucceededRequest()
        req.set_TaskToken(task_token)
        req.set_Output('{"status":
    "success"}')
        resp =
    fnf_client.do_action_with_exception(req)
        print('Report task response:
    {}'.format(resp))
    
        # Response to http request
        status = '200 OK'
        response_headers = [('Content-type',
    'text/plain')]
        start_response(status,
    response_headers)
        return [b'OK']

更多信息

任务步骤编排 MNS 主题应用的更多信息,请参见 task-mns-topics 应用代码