Serverless 工作流的服务集成功能可以简化用户于云服务的交互。本文示例中,我们使用 MNS 队列集成的功能,结合回调 (callback)完成更多非函数计算的计算任务的编排。

简介

Serverless 工作流的使用场景不仅限于编排函数计算(FC)的 FaaS 函数,也包括广义上任意的计算任务。在上一篇最佳实践文档异步任务回调中介绍了利用 FC 函数向 MNS 队列中发送消息,自定义环境中的任务执行者 (worker) 接收到消息,结合回调(callback)通知 Serverless 工作流任务执行结果。在下文中,将介绍如何使用 Serverless 工作流的新功能 MNS 队列。MNS 队列进一步简化编排自定义任务类型。 Serverless 工作流可以直接向 MNS 的队列发送消息,省去了发送消息的 FC 函数的开发、测试和维护,提高了可用性,降低了延迟。使用 MNS 集成相比通过 FC Function 发送消息到 MNS 的做法有以下好处:

  • 无须为发送消息做 FC 函数的开发,降低了开发、测试和维护成本。
  • 降低了消息传递的延时、少了一次远程访问、避免了函数计算的冷启动。
  • 去除了一个服务依赖、提高了容错性。

Serverless 工作流未来会推出更多的云服务集成,让不同类型任务组成的工作流编排变得更加容易。

服务集成功能

下图中 3 个串行的任务由 Serverless 工作流负责依次发送至用户指定的 MNS 队列中。消息发送成功之后 Serverless 工作流将会在该步骤暂定等待回调。用户在自定义环境中的 worker(例如 ECS VM、容器、自建机房内的机器)调用 MNS ReceiveMessage 接口拉取消息。收到消息后,worker 根据消息内容执行相应的任务。任务结束后,调用 Serverless 工作流ReportTaskSucceeded/Failed 接口,Serverless 工作流收到任务结果后继续该步骤执行。Worker 在汇报任务结果成功后删除 MNS 队列消息。

fnf-docs-service-integration

步骤详解

下文将详细介绍使用该功能的步骤。

  1. 准备工作
  2. 编写流程 (Flow)
  3. 编写 worker
  4. 执行并查看结果

步骤 1:准备工作

  1. 通过 MNS 控制台创建 MNS 队列,详细步骤请参见创建队列
  2. Serverless 工作流需要扮演用户在 Flow 中指定的执行角色 (RAM role)向用户账号下的 MNS 队列发送消息,因此需要为该 RAM role 添加 MNS SendMessage 相关的权限策略 (policy),细粒度的策略示例如下。如没有细粒度权限控制的需求,可以通过 Serverless 工作流控制台Flow RAM role 添加系统策略AliyunMNSFullAccess
{
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "mns:SendMessage"
            ],
            "Resource": [
                "acs:mns:$region:$account_id:/queues/$queue_name/messages"
            ]
        }
    ],
    "Version": "1"
}         

步骤 2:编写流程 (Flow)

下面的 FDL 是一个可以向 fnf-demo 这个 MNS 队列发送消息并且等待回调的任务(Task)步骤。

version: v1
type: flow
steps:
 - type: task
   name: Task_1
   resourceArn: acs:mns:::/queues/fnf-demo/messages  # 表示该任务(Task)步骤会向同区域, 同账号下的 MNS 队列 fnf-demo 发送消息。
   pattern: waitForCallback  # 表示该任务步骤在发送 MNS 消息成功后会暂停,直到收到回调。
   inputMappings:
      - target: task_token
        source: $context.task.token  # 从 context 对象中获取标识该任务的令牌 (task token)。
      - target: key
        source: value
   serviceParams:  # 服务集成参数。
      MessageBody: $  # 用映射后的 input 作为要发送消息的内容。
      Priority: 1  # 消息队列的优先级。         

步骤 3:编写 worker

下面的 Python 2.7 代码模拟一个执行任务的 worker, 它可以运行在任何可以访问到 Serverless 工作流和 MNS 服务的环境中。该 worker 长轮询调用 MNS ReceiveMessage,当一个带有 MNS 配置的任务步骤进入时,Serverless 工作流会向 fnf-demo 这个队列中发送一个消息。该 worker 执行相应任务结束后回调 (callback)Serverless 工作流ReportTaskSucceeded/Failed 接口,在任务结果汇报完成后 Serverless 工作流会继续当前任务步骤执行,worker 可以删除消息。

  1. 在虚拟环境中,安装 fnf、mns、Python SDK 。
    cd /tmp; mkdir -p fnf-demo-callback; cd fnf-demo-callback
    virtualenv env; source env/bin/activate
    pip install -t . aliyun-python-sdk-core -t . aliyun-python-sdk-fnf -t . aliyun-mns                              
  2. 编写本地任务执行者 worker.py 代码 。
    import json
    import os
    
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
    from mns.account import Account # pip install aliyun-mns
    from mns.queue import *
    
    def main():
      region = os.environ['REGION']
      account_id = os.environ['ACCOUNT_ID']
      ak_id = os.environ['AK_ID']
      ak_secret = os.environ['AK_SECRET']
    
      queue_name = "fnf-demo"
      fnf_client = AcsClient(
        ak_id,
        ak_secret,
        region
      )
    
      mns_endpoint = "https://%s.mns.%s.aliyuncs.com" % (account_id, region)
      my_account = Account(mns_endpoint, ak_id, ak_secret)
      my_queue = my_account.get_queue("fnf-demo")
      my_queue.set_encoding(False)
      wait_seconds = 10
    
      try:
        while True:
          try:
            print "Receiving messages"
            recv_msg = my_queue.receive_message(wait_seconds)
    
            print "Received message %s, body %s" % (recv_msg.message_id, recv_msg.message_body)
            body = json.loads(recv_msg.message_body)
            task_token = body["task_token"]
            output = "{\"key\": \"value\"}"
            request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
            request.set_Output(output)
            request.set_TaskToken(task_token)
            resp = fnf_client.do_action_with_exception(request)
            print "Report task succeeded finished"
            my_queue.delete_message(recv_msg.receipt_handle)
            print "Deleted message " + recv_msg.message_id
          except MNSExceptionBase as e:
            print(e)
          except ServerException as e:
            print(e)
            if e.error_code == 'TaskAlreadyCompleted':
              my_queue.delete_message(recv_msg.receipt_handle)
              print "Task already completed, deleted message " + recv_msg.message_id
      except ServerException as e:
        print(e)
    
    if __name__ == '__main__':
        main()                               
  3. 开启 worker,长轮询 fnf-demo 队列,收到消息后回调 Serverless 工作流汇报结果。
    # 执行 worker 进程。
    export REGION={your-region}
    export ACCOUNT_ID={your-account-id}
    export AK_ID={your-ak-id}
    export AK_SECRET={your-ak-secret}
    
    python worker.py           

步骤 4:执行并查看结果

Serverless 工作流控制台开始一个流程的执行,配合 worker 可以看到流程成功执行。

Screen Shot 2019-09-17 at 7.51.12 PM