本文介绍了Serverless工作流的回调功能。相比较轮询,使用回调有效地降低了延迟、减少了轮询对服务器造成的不必要压力。另外,回调功能配合队列可以实现对非FC任务的编排,将Serverless工作流的编排范围扩展到任意类型的计算资源。

简介

长时间执行的任务通常会采用异步提交任务并返回任务标识(ID),判断异步任务结束的方法通常有两种:轮询(polling)和回调(callback),在任务状态轮询中我们介绍了使用轮询来判断任务结束。Serverless工作流的回调(callback)功能,覆盖以下的痛点或场景:

  • 消除轮询周期长带来的不必要延迟。
  • 消除大流量场景下高并发的轮询造成不必要的服务器资源压力和浪费。
  • 编排非FC Function的任务,例如运行在自建机房或ECS上的进程。
  • 需要人工干预的步骤,例如通知审批通过。

下图展示了使用MNS队列服务集成结合回调API编排自建资源,拓宽Serverless工作流的适用场景。

fnf-doc-service-integration-mns-queue

回调使用详解

Task步骤中指定pattern: waitForCallback,如下图状态机所示:该步骤会在提交resourceArn指定的任务后(如FC invocation)该步骤会将一个taskToken存入到该步骤的context对象并进入一个暂停的状态,直到Serverless工作流收到回调或指定的任务超时。将taskToken传入ReportTaskSucceedReportTaskFailed接口去回调会使得该步骤继续执行。

fnf-doc-callback-state-machine
  - type: task
      name: mytask
      resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
      pattern: waitForCallback  # 指定该Task步骤在提交任务后等待回调。
      inputMappings:
        - target: taskToken
          source: $context.task.token  # 将context中的taskToken作为input传入resourceArn指定的函数。
      outputMappings:
        - target: k
          source: $local.key  # 将ReportTaskSucceeded中output {"key": "value"}映射成 {"k": "value"}并作为该步骤的输出。         

示例

该示例共分为以下3个步骤:

  1. 准备Task Function
  2. 开始工作流
  3. 回调

步骤1:准备Task Function

创建下面一个简单的函数,该函数会将输入直接返回。
  • 服务:fnf-demo。
  • 函数:echo。
  • 运行环境:python2.7。
  • 函数入口:index.handler。
#!/usr/bin/env python
import json

def handler(event, context):
    return event          

步骤2:开始工作流

创建流程,并开始执行。
  • 流程名称:fnf-demo-callback。
  • 流程角色:配置一个有FC Invocation权限的角色。
version: v1
type: flow
steps:
  - type: task
    name: mytask
    resourceArn: acs:fc:::services/fnf-demo/functions/echo
    pattern: waitForCallback
    inputMappings:
      - target: taskToken
        source: $context.task.token
    outputMappings:
      - target: s
        source: $local.status         

流程开始后可以看到mytask步骤暂停在TaskSubmitted事件,等待回调。该事件的output中含有taskToken作为回调任务的标识。

Screen Shot 2019-08-15 at 11.00.09 AM

步骤3:回调

使用Serverless工作流Python SDK在本地(或其他可以运行Python的环境)运行callback.py脚本,将{task-token}替换为TaskSubmitted事件中的值。
cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback

# 在虚拟环境中,安装fnf python SDK。
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf

# 执行worker进程。
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
                        
# worker.py 代码:

import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest

def main():
  account_id = os.environ['ACCOUNT_ID']
  akid = os.environ['AK_ID']
  ak_secret = os.environ['AK_SECRET']

  fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")

  task_token = sys.argv[1]
  print "task token " + task_token
  try:
    request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
    request.set_Output("{\"status\": \"ok\"}")
    request.set_TaskToken(task_token)
    resp = fnf_client.do_action_with_exception(request)
    print "Report task succeeded finished"
  except ServerException as e:
    print(e)

if __name__ == '__main__':
    main()                      
上述脚本回调成功后可以看到mytask步骤继续执行,ReportTaskSucceeded中指定的输出"{"status": "ok"}"经过outputMappings的映射后变成"{"s": "ok"}"。