文档

集成轻量消息队列(原 MNS)队列

更新时间:

Serverless 工作流任务(Task)步骤支持向某个指定的轻量消息队列(原 MNS)队列中发送消息,本文介绍了轻量消息队列(原 MNS)队列集成的使用场景、通用参数、集成模式以及权限配置。

使用场景

任务步骤对轻量消息队列(原 MNS)队列的集成有请求响应(requestResponse)模式和等待回调(waitForCallback)模式,分别适用于不同场景。

集成模式

模式(pattern)参数

适用场景

说明

请求响应模式

requestResponse

事件通知

持久化通知工作流执行外的服务,流程执行不关心被通知者如何处理该消息。

等待回调模式

waitForCallback

编排自定义任务类型

发送消息到队列,任意环境中的任务执行者(例如ECS虚拟机、自建机房的服务器)收到消息,执行任务后回调Serverless 工作流,更多信息,请参见最佳实践

通用参数

下文示例中,Serverless 工作流会向resourceArn中指定的队列名称(替换{queue-name})发送一条消息。消息正文和参数在serviceParams对象中指定,支持的参数如下所示:

  • (必填)MessageBody:消息正文。MessageBody: $表示通过输入映射(inputMappings)产生消息正文。例如该步骤进入(StepEntered)时:

    • input对象为{"key": "value_1"}

    • 映射后的输入为{"key_1": "value_1", "key_2": "value"}

    • 任务步骤会以{"key_1": "value_1", "key_2": "value"}字符串作为消息正文发送到轻量消息队列(原 MNS)队列。

  • (必填)Priority:消息优先级权值。

  • (可选)DelaySeconds:延迟出现秒数。

更多信息,请参见轻量消息队列(原 MNS) SendMessage API

version: v1
type: flow
steps:
 - type: task
   name: Task_1
   resourceArn: acs:mns:::/queues/{queue-name}/messages  # 表示该任务(Task)步骤会向同地域、同账号下的轻量消息队列(原 MNS) {queue-name}队列发送消息。
   pattern: requestResponse                              # 表示该任务步骤在发送轻量消息队列(原 MNS)消息完成后结束。
   inputMappings:
      - target: key_1
        source: $input.key
      - target: key_2
        source: value
   serviceParams:     # 服务集成参数。
       MessageBody: $  # 用映射后的input作为要发送消息的内容。
       DelaySeconds: 0 # 消息延迟出现时间,单位为秒。
       Priority: 1     # 消息队列的优先级。

集成模式

  • 请求响应(requestResponse)模式

    任务步骤中的pattern: requestResponse代表集成为请求响应模式。该模式下,Serverless 工作流会向resourceArn中指定的{queue-name}发送一条消息,发送消息成功后步骤即成功。当发送失败时将根据重试配置决定步骤失败或是重试。

    version: v1
    type: flow
    steps:
     - type: task
       name: mytask
       resourceArn: acs:mns:::/queues/{queue-name}/messages  # 表示该任务(Task)步骤会向同区域,同账号下的轻量消息队列(原 MNS)队列发送消息。
       pattern: requestResponse                              # 表示该任务步骤在发送轻量消息队列(原 MNS)消息完成后结束。
       inputMappings:
          - target: key_3
            source: value
       serviceParams:     # 服务集成参数。
            MessageBody: $  # {"key_3": "value"}会作为消息正文被发送到轻量消息队列(原 MNS)队列。
            DelaySeconds: 0 # 消息延迟出现时间,单位为秒。
            Priority: 1     # 消息队列的优先级。
  • 等待回调(waitForCallback)模式

    任务步骤中的pattern: waitForCallback代表集成为等待回调模式。该模式下,Serverless 工作流会向resourceArn中指定的{queue-name}发送一条消息,发送消息成功后步骤会暂停并且等待一个回调。任务执行者需要使用该任务令牌(taskToken)回调Serverless 工作流。当回调结果为成功时(调用ReportTaskSuccceeded)则该任务步骤成功,否则(调用了ReportTaskFailed)将根据重试配置决定步骤失败或是重试。

    任务令牌(taskToken):任务令牌可以从该步骤的上下文(context)对象中获取,通过输入映射赋值给消息正文JSON对象的某个字段。下文示例中,假设该步骤目前的context对象是{"task":{"token":"my-token-1"}},则映射后的任务输入为{"task_token": "my-token-1", "key": "value"},该输入也会被作为消息正文发送到轻量消息队列(原 MNS)队列。

    version: v1
    type: flow
    steps:
     - type: task
       name: mytask
       resourceArn: acs:mns:::/queues/{queue-name}/messages  # 表示该任务(Task)步骤会向同区域,同账号下的轻量消息队列(原 MNS) {queue-name} 队列发送消息。
       pattern: waitForCallback  # 表示该任务步骤在发送轻量消息队列(原 MNS)消息成功后会暂停,直到收到回调。
       inputMappings:
          - target: task_token
            source: $context.task.token  # 从context对象中获取标识该任务的令牌 (task token)。
          - target: key
            source: value
       serviceParams:     # 服务集成参数。
            MessageBody: $  # 用映射后的input作为要发送消息的内容。
            Priority: 1     # 消息队列的优先级。

流程角色配置

Serverless 工作流通过扮演流程(Flow)指定的RAM角色(RAM role)获取您的临时AccessKey代表您访问轻量消息队列(原 MNS),因此需要为流程角色配置可以向轻量消息队列(原 MNS)发送消息(SendMessage)的权限。您可以选择系统权限策略或者自定义权限策略中的任意一种配置方法授权Serverless 工作流向您的队列发送消息。

  • 系统权限策略:向流程角色授予AliyunMNSFullAccess系统权限。

  • 自定义权限策略:如果需要更细粒度的控制只允许向{queue-name}队列发送消息,创建以下策略绑定在流程角色上。

    {
      "Version": "1",
      "Statement": [
        {
          "Action": "mns:SendMessage",
          "Resource": "acs:mns:*:*:/queues/{queue-name}/messages",
          "Effect": "Allow"
        }
      ]
    }