本文介绍了任务状态轮询和Serverless工作流实现的具体步骤。
简介
在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本示例通过一个假设场景:用户调用函数计算提交了一个多媒体处理任务,该任务耗时从1分钟到几小时不等,任务执行状态可以通过API查询,介绍如何使用Serverless工作流实现一个通用可靠的任务轮询工作流。
Serverless工作流实现
下面的教程会将两个FC函数编排成一个任务轮询工作流,该示例需要以下3个步骤:
步骤1:创建FC函数
首先创建一个名为fnf-demo
的FC服务,并在该服务下创建两个Python3.10的函数,详细步骤,请参见快速创建函数。
StartJob函数:模拟通过调用API开始一个长时间的任务,返回一个任务ID。
import logging import uuid def handler(event, context): logger = logging.getLogger() id = uuid.uuid4() logger.info('Started job with ID %s' % id) return {"job_id": str(id)}
GetJobStatus函数:模拟通过调用API获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中
delay
的值,返回不同的状态:“success”或“running”。import logging import uuid import time import json start_time = int(time.time()) def handler(event, context): evt = json.loads(event) logger = logging.getLogger() job_id = evt["job_id"] logger.info('Started job with ID %s' % job_id) now = int(time.time()) status = "running" delay = 60 if "delay" in evt: delay = evt["delay"] if now - start_time > delay: status = "success" try_count = 0 if "try_count" in evt: try_count = evt["try_count"] try_count = try_count + 1 logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count)) return {"job_id": job_id, "job_status":status, "try_count":try_count}
步骤2:创建Serverless工作流流程
在云工作流CloudFlow控制台的流程中单击创建流程,根据引导只填写基本信息和流程定义,其它参数默认即可。
该流程的主要逻辑描述如下:
StartJob步骤:调用
StartJob
函数开始一个任务。Wait10s步骤:等待10秒。
GetJobStatus步骤:调用
GetJobStatus
。CheckJobComplete步骤:检查
GetJobStatus
函数返回的结果:如果返回"success"整个流程执行成功。
如果轮询尝试次数大于3次,认为任务执行失败,流程执行失败。
如果返回"running"则跳回到
Wait10s
步骤,继续执行。
version: v1 type: flow steps: - type: task name: StartJob resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/StartJob - type: pass name: Init outputMappings: - target: try_count source: 0 - type: wait name: Wait10s duration: 10 - type: task name: GetJobStatus resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/GetJobStatus inputMappings: - target: job_id source: $local.job_id - target: delay source: $input.delay - target: try_count source: $local.try_count - type: choice name: CheckJobComplete inputMappings: - target: status source: $local.job_status - target: try_count source: $local.try_count choices: - condition: $.status == "success" goto: JobSucceeded - condition: $.try_count > 3 goto: JobFailed - condition: $.status == "running" goto: Wait10s - type: succeed name: JobSucceeded - type: fail name: JobFailed
步骤3:开始执行并查看结果
在控制台创建好的流程中单击新执行并提供以下JSON对象作为输入,其中delay
字段的值模拟任务完成需要的时间,这里预期任务在开始20秒后,GetJobStatus
函数返回“success”,在此之前均返回“running”,您可以调整delay
的值观察不同的执行结果。
{
"delay": 20
}
下图展示的是轮询从开始到结束的流程执行可视化。
下图展示的是任务需要20秒完成,可以看到流程执行历史中第一次
GetJobStatus
返回“running”因此CheckJobComplete
的后续步骤眺回到Wait10s
进行等待和下一次查询,第二次查询返回“success”,流程执行结束。