本文介绍了任务状态轮询和 Serverless 工作流实现的具体步骤。

简介

在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本示例通过一个假设场景:用户调用函数计算提交了一个多媒体处理任务,该任务耗时从 1 分钟到几小时不等,任务执行状态可以通过 API 查询,介绍如何使用 Serverless 工作流实现一个通用可靠的任务轮询工作流。

Serverless 工作流实现

下面的教程会将两个 FC 函数编排成一个任务轮询工作流,该示例需要以下 3 个步骤:

  1. 创建 FC 函数
  2. 创建 Serverless 工作流流程
  3. 开始执行并查看结果

步骤1:创建 FC 函数

首先创建一个名为 fnf-demo 的 FC 服务,并在该服务下创建两个 Python2.7 的函数,详细步骤请参见 FC 文档
  • StartJob 函数:模拟通过调用 API 开始一个长时间的任务,返回一个任务 ID。
    import logging
    import uuid
    
    def handler(event, context):
      logger = logging.getLogger()
      id = uuid.uuid4()
      logger.info('Started job with ID %s' % id)
      return {"job_id": str(id)}           
  • GetJobStatus 函数: 模拟通过调用 API 获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中 delay 的值,返回不同的状态:“success” 或 “running”。
    import logging
    import uuid
    import time
    import json
    
    start_time = int(time.time())
    
    def handler(event, context):
      evt = json.loads(event)
      logger = logging.getLogger()
      job_id = evt["job_id"]
      logger.info('Started job with ID %s' % job_id)
    
      now = int(time.time())
      status = "running"
    
      delay = 60
      if "delay" in evt:
        delay = evt["delay"]
    
      if now - start_time > delay:
        status = "success"
    
      try_count = 0
      if "try_count" in evt:
        try_count = evt["try_count"]
    
      try_count = try_count + 1
      logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count))
      return {"job_id": job_id, "job_status":status, "try_count":try_count}         

步骤 2:创建 Serverless 工作流流程

该流程的主要逻辑描述如下:

  1. StartJob 步骤: 调用 StartJob 函数开始一个任务。
  2. Wait10s 步骤: 等待 10 秒。
  3. GetJobStatus 步骤: 调用 GetJobStatus 函数查询当前任务状态。
  4. CheckJobComplete 步骤: 检查 GetJobStatus 函数返回的结果:
    • 如果返回 "success" 整个流程执行成功。
    • 如果轮询尝试次数大于 3 次,认为任务执行失败,流程执行失败。
    • 如果返回 "running" 则跳回到 Wait10s 步骤,继续执行。
    version: v1
    type: flow
    steps:
      - type: task
        name: StartJob
        resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/StartJob
      - type: pass
        name: Init
        outputMappings:
          - target: try_count
            source: 0
      - type: wait
        name: Wait10s
        duration: 10
      - type: task
        name: GetJobStatus
        resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/GetJobStatus
        inputMappings:
          - target: job_id
            source: $local.job_id
          - target: delay
            source: $input.delay
          - target: try_count
            source: $local.try_count
      - type: choice
        name: CheckJobComplete
        inputMappings:
          - target: status
            source: $local.job_status
          - target: try_count
            source: $local.try_count
        choices:
            - condition: $.status == "success"
              goto: JobSucceeded
            - condition: $.try_count > 3
              goto: JobFailed
            - condition: $.status == "running"
              goto: Wait10s
      - type: succeed
        name: JobSucceeded
      - type: fail
        name: JobFailed           

步骤 3:开始执行并查看结果

在控制台创建好的流程中单击 新执行 并提供以下 JSON 对象作为输入,其中 delay 字段的值模拟任务完成需要的时间,这里预期任务在开始 20秒 后, GetJobStatus 函数返回 “success”,在此之前均返回 “running”,您可以调整 delay 的值观察不同的执行结果。

{
  "delay": 20
}            
  • 下图展示的是轮询从开始到结束的流程执行可视化。Screen Shot 2019-06-26 at 12.30.01 PM
  • 下图展示的是任务需要 20 秒完成,可以看到流程执行历史中第一次 GetJobStatus 返回 “running” 因此 CheckJobComplete 的后续步骤眺回到 Wait10s 进行等待和下一次查询,第二次查询返回 “success”,流程执行结束。Screen Shot 2019-06-26 at 12.39.26 PM