异步任务回调
本文介绍了Serverless 工作流的回调功能。相比较轮询,使用回调有效地降低了延迟、减少了轮询对服务器造成的不必要压力。另外,回调功能配合队列可以实现对非FC任务的编排,将Serverless 工作流的编排范围扩展到任意类型的计算资源。
简介
长时间执行的任务通常会采用异步提交任务并返回任务标识(ID),判断异步任务结束的方法通常有两种:轮询(polling)和回调(callback),在任务状态轮询中我们介绍了使用轮询来判断任务结束。Serverless 工作流的回调(callback)功能,覆盖以下的痛点或场景:
- 消除轮询周期长带来的不必要延迟。
- 消除大流量场景下高并发的轮询造成不必要的服务器资源压力和浪费。
- 编排非FC Function的任务,例如运行在自建机房或ECS上的进程。
- 需要人工干预的步骤,例如通知审批通过。
下图展示了使用MNS队列服务集成结合回调API编排自建资源,拓宽Serverless 工作流的适用场景。
回调使用详解
在Task步骤中指定pattern: waitForCallback
,如下图状态机所示:该步骤会在提交resourceArn
指定的任务后(如FC invocation)该步骤会将一个taskToken
存入到该步骤的context
对象并进入一个暂停的状态,直到Serverless 工作流收到回调或指定的任务超时。将taskToken
传入ReportTaskSucceed
或ReportTaskFailed
接口去回调会使得该步骤继续执行。
- type: task
name: mytask
resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
pattern: waitForCallback # 指定该Task步骤在提交任务后等待回调。
inputMappings:
- target: taskToken
source: $context.task.token # 将context中的taskToken作为input传入resourceArn指定的函数。
outputMappings:
- target: k
source: $local.key # 将ReportTaskSucceeded中output {"key": "value"}映射成 {"k": "value"}并作为该步骤的输出。
示例
该示例共分为以下3个步骤:
步骤1:准备Task Function
创建下面一个简单的函数,该函数会将输入直接返回。
- 服务:fnf-demo。
- 函数:echo。
- 运行环境:python2.7。
- 函数入口:index.handler。
#!/usr/bin/env python
import json
def handler(event, context):
return event
步骤2:开始工作流
创建流程,并开始执行。
- 流程名称:fnf-demo-callback。
- 流程角色:配置一个有FC Invocation权限的角色。
version: v1
type: flow
steps:
- type: task
name: mytask
resourceArn: acs:fc:::services/fnf-demo/functions/echo
pattern: waitForCallback
inputMappings:
- target: taskToken
source: $context.task.token
outputMappings:
- target: s
source: $local.status
流程开始后可以看到mytask
步骤暂停在TaskSubmitted
事件,等待回调。该事件的output
中含有taskToken
作为回调任务的标识。
步骤3:回调
使用Serverless 工作流的Python SDK在本地(或其他可以运行Python的环境)运行
callback.py
脚本,将{task-token}替换为TaskSubmitted
事件中的值。cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback
# 在虚拟环境中,安装fnf python SDK。
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf
# 执行worker进程。
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
# worker.py 代码:
import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
def main():
account_id = os.environ['ACCOUNT_ID']
akid = os.environ['AK_ID']
ak_secret = os.environ['AK_SECRET']
fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")
task_token = sys.argv[1]
print "task token " + task_token
try:
request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
request.set_Output("{\"status\": \"ok\"}")
request.set_TaskToken(task_token)
resp = fnf_client.do_action_with_exception(request)
print "Report task succeeded finished"
except ServerException as e:
print(e)
if __name__ == '__main__':
main()
上述脚本回调成功后可以看到
mytask
步骤继续执行,ReportTaskSucceeded
中指定的输出"{"status": "ok"}"经过outputMappings的映射后变成"{"s": "ok"}"。