Serverless 工作流任务(Task)步骤支持向某个指定的 MNS 队列中发送消息,本文介绍了 MNS 队列集成的使用场景、通用参数、集成模式以及权限配置。

使用场景

任务步骤对 MNS 队列的集成有请求响应(requestResponse)模式和等待回调(waitForCallback)模式,分别适用于不同场景。

集成模式 模式 (pattern)参数 适用场景 说明
请求响应模式 requestResponse 事件通知 持久化通知工作流执行外的服务,流程执行不关心被通知者如何处理该消息。
等待回调模式 waitForCallback 编排自定义任务类型 发送消息到队列,任意环境中的任务执行者(例如 ECS 虚拟机、自建机房的服务器)收到消息,执行任务后回调 Serverless 工作流,详情请参见最佳实践

通用参数

下文示例中,Serverless 工作流会向 resourceArn 中指定的队列名称(替换 {queue-name})发送一条消息。消息正文和参数在 serviceParams 对象中指定,支持的参数如下所示:

  • MessageBody:消息正文,必填。MessageBody: $ 表示通过输入映射 (inputMappings)产生消息正文。例如该步骤进入(StepEntered)时:
    • input 对象为 {"key": "value_1"}
    • 映射后的输入为 {"key_1": "value_1", "key_2": "value"}
    • 任务步骤会以 {"key_1": "value_1", "key_2": "value"} 字符串作为消息正文发送到 MNS 队列。
  • Priority:消息优先级权值,必填。
  • DelaySeconds:延迟出现秒数,选填。

更多参数信息,请参见 MNS SendMessage API。

version: v1
type: flow
steps:
 - type: task
   name: Task_1
   resourceArn: acs:mns:::/queues/{queue-name}/messages  # 表示该任务(Task)步骤会向同地域、同账号下的 MNS {queue-name} 队列发送消息。
   pattern: requestResponse                              # 表示该任务步骤在发送 MNS 消息完成后结束。
   inputMappings:
      - target: key_1
        source: $input.key
      - target: key_2
        source: value
   serviceParams:     # 服务集成参数。
      MessageBody: $  # 用映射后的 input 作为要发送消息的内容。
      DelaySeconds: 0 # 消息延迟出现时间,单位为秒。
      Priority: 1     # 消息队列的优先级。

集成模式

  • 请求响应(requestResponse)模式

    任务步骤中的 pattern: requestResponse 代表集成为请求响应模式。该模式下,Serverless 工作流会向 resourceArn 中指定的 {queue-name} 发送一条消息,发送消息成功后步骤即成功。当发送失败时将根据重试配置决定步骤失败或是重试。

    version: v1
    type: flow
    steps:
     - type: task
       name: mytask
       resourceArn: acs:mns:::/queues/{queue-name}/messages  # 表示该任务(Task)步骤会向同区域,同账号下的 MNS 队列发送消息。
       pattern: requestResponse                              # 表示该任务步骤在发送 MNS 消息完成后结束。
       inputMappings:
          - target: key_3
            source: value
       serviceParams:     # 服务集成参数。
          MessageBody: $  # {"key_3": "value"} 会作为消息正文被发送到 MNS 队列。
          DelaySeconds: 0 # 消息延迟出现时间,单位为秒。
          Priority: 1     # 消息队列的优先级。
  • 等待回调(waitForCallback)模式

    任务步骤中的 pattern: waitForCallback 代表集成为等待回调模式。该模式下,Serverless 工作流会向 resourceArn 中指定的 {queue-name} 发送一条消息,发送消息成功后步骤会暂停并且等待一个回调。任务执行者需要使用该任务令牌(taskToken)回调 Serverless 工作流。当回调结果为成功时(调用 ReportTaskSuccceeded)则该任务步骤成功,否则(调用了 ReportTaskFailed)将根据重试配置决定步骤失败或是重试。

    任务令牌(taskToken):任务令牌可以从该步骤的上下文(context)对象中获取,通过输入映射赋值给消息正文 JSON 对象的某个字段。下文示例中,假设该步骤目前的 context 对象是 {"task":{"token":"my-token-1"}},则映射后的任务输入为 {"task_token": "my-token-1", "key": "value"},该输入也会被作为消息正文发送到 MNS 队列。

    version: v1
    type: flow
    steps:
     - type: task
       name: mytask
       resourceArn: acs:mns:::/queues/{queue-name}/messages  # 表示该任务(Task)步骤会向同区域,同账号下的 MNS {queue-name} 队列发送消息。
       pattern: waitForCallback  # 表示该任务步骤在发送 MNS 消息成功后会暂停,直到收到回调。
       inputMappings:
          - target: task_token
            source: $context.task.token  # 从 context 对象中获取标识该任务的令牌 (task token)。
          - target: key
            source: value
       serviceParams:     # 服务集成参数。
          MessageBody: $  # 用映射后的 input 作为要发送消息的内容。
          Priority: 1     # 消息队列的优先级。

流程角色配置

Serverless 工作流通过扮演流程(Flow)指定的 RAM 角色(RAM role)获取您的临时 AccessKey 代表您访问 MNS,因此需要为流程角色配置可以向 MNS 发送消息(SendMessage)的权限。您可以选择系统权限策略或者自定义权限策略中的任意一种配置方法授权 Serverless 工作流向您的 MNS 队列发送消息。

  • 系统权限策略:向流程角色授予 AliyunMNSFullAccess 系统权限。
  • 自定义权限策略:如果需要更细粒度的控制只允许向 {queue-name} 队列发送消息,创建以下策略绑定在流程角色上。
    {
      "Version": "1",
      "Statement": [
        {
          "Action": "mns:SendMessage", 
          "Resource": "acs:mns:*:*:/queues/{queue-name}/messages",
          "Effect": "Allow"
        }
      ]
    }