本文介绍了 Serverless 工作流的回调功能。相比较轮询,使用回调有效地降低了延迟、减少了轮询对服务器造成的不必要压力。另外,回调功能配合队列可以实现对非 FC 任务的编排,将 Serverless 工作流的编排范围扩展到任意类型的计算资源。

简介

长时间执行的任务通常会采用异步提交任务并返回任务标识 (ID),判断异步任务结束的方法通常有两种:轮询 (polling)和回调 (callback),在任务状态轮询中我们介绍了使用轮询来判断任务结束。Serverless 工作流的回调 (callback)功能,覆盖以下的痛点或场景:

  • 消除轮询周期长带来的不必要延迟。
  • 消除大流量场景下高并发的轮询造成不必要的服务器资源压力和浪费 。
  • 编排非 FC Function 的任务,例如运行在自建机房或 ECS 上的进程 。
  • 需要人工干预的步骤,例如通知审批通过。

下图展示了使用 MNS 队列服务集成结合回调 API 编排自建资源,拓宽了 Serverless 工作流的适用场景。

fnf-doc-service-integration-mns-queue

回调使用详解

Task 步骤中指定 pattern: waitForCallback,如下图状态机所示:该步骤会在提交 resourceArn 指定的任务后(如 FC invocation)该步骤会将一个 taskToken 存入到该步骤的 context 对象并进入一个暂停的状态,直到 Serverless 工作流收到回调或指定的任务超时。将 taskToken 传入 ReportTaskSucceedReportTaskFailed 接口去回调会使得该步骤继续执行。

fnf-doc-callback-state-machine
  - type: task
      name: mytask
      resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
      pattern: waitForCallback  # 指定该 Task 步骤在提交任务后等待回调。
      inputMappings:
        - target: taskToken
          source: $context.task.token  # 将 context 中的 taskToken 作为 input 传入 resourceArn 指定的函数。
      outputMappings:
        - target: k
          source: $local.key  # 将 ReportTaskSucceeded 中 output {"key": "value"} 映射成 {"k": "value"} 并作为该步骤的输出。         

示例

该示例共分为以下 3 个步骤:

  1. 准备 Task Function
  2. 开始工作流
  3. 回调

步骤 1:准备 Task Function

创建下面一个简单的函数,该函数会将输入直接返回。
  • 服务:fnf-demo。
  • 函数:echo。
  • 运行环境:python2.7。
  • 函数入口:index.handler。
#!/usr/bin/env python
import json

def handler(event, context):
    return event          

步骤 2:开始工作流

Serverless 工作流控制台创建下面的流程,并开始执行。
  • 流程名称:fnf-demo-callback。
  • 流程角色:配置一个有 FC Invocation 权限的角色。
version: v1
type: flow
steps:
  - type: task
    name: mytask
    resourceArn: acs:fc:::services/fnf-demo/functions/echo
    pattern: waitForCallback
    inputMappings:
      - target: taskToken
        source: $context.task.token
    outputMappings:
      - target: s
        source: $local.status         

流程开始后可以看到 mytask 步骤暂停在 TaskSubmitted 事件,等待回调。该事件的 output 中含有 taskToken 作为回调任务的标识。

Screen Shot 2019-08-15 at 11.00.09 AM

步骤 3:回调

使用 Serverless 工作流Python SDK 在本地(或其他可以运行 Python 的环境)运行 callback.py 脚本,将 {task-token} 替换为 TaskSubmitted 事件中的值。
cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback

# 在虚拟环境中,安装 fnf python SDK。
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf

# 执行 worker 进程。
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
						
# worker.py 代码:

import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest

def main():
  account_id = os.environ['ACCOUNT_ID']
  akid = os.environ['AK_ID']
  ak_secret = os.environ['AK_SECRET']

  fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")

  task_token = sys.argv[1]
  print "task token " + task_token
  try:
    request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
    request.set_Output("{\"status\": \"ok\"}")
    request.set_TaskToken(task_token)
    resp = fnf_client.do_action_with_exception(request)
    print "Report task succeeded finished"
  except ServerException as e:
    print(e)

if __name__ == '__main__':
    main()                      
上述脚本回调成功后可以看到 mytask 步骤继续执行, ReportTaskSucceeded 中指定的输出 "{"status": "ok"}" 经过 outputMappings 的映射后变成 "{"s": "ok"}"。