本文介绍了任务状态轮询和Serverless工作流实现的具体步骤。

简介

在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本示例通过一个假设场景:用户调用函数计算提交了一个多媒体处理任务,该任务耗时从1分钟到几小时不等,任务执行状态可以通过API查询,介绍如何使用Serverless工作流实现一个通用可靠的任务轮询工作流。

Serverless工作流实现

下面的教程会将两个FC函数编排成一个任务轮询工作流,该示例需要以下3个步骤:

  1. 创建FC函数
  2. 创建Serverless工作流流程
  3. 开始执行并查看结果

步骤1:创建FC函数

首先创建一个名为fnf-demo的FC服务,并在该服务下创建两个Python2.7的函数,详细步骤请参见FC文档
  • StartJob函数:模拟通过调用API开始一个长时间的任务,返回一个任务ID。
    import logging
    import uuid
    
    def handler(event, context):
      logger = logging.getLogger()
      id = uuid.uuid4()
      logger.info('Started job with ID %s' % id)
      return {"job_id": str(id)}           
  • GetJobStatus函数:模拟通过调用API获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中delay的值,返回不同的状态:“success”或“running”。
    import logging
    import uuid
    import time
    import json
    
    start_time = int(time.time())
    
    def handler(event, context):
      evt = json.loads(event)
      logger = logging.getLogger()
      job_id = evt["job_id"]
      logger.info('Started job with ID %s' % job_id)
    
      now = int(time.time())
      status = "running"
    
      delay = 60
      if "delay" in evt:
        delay = evt["delay"]
    
      if now - start_time > delay:
        status = "success"
    
      try_count = 0
      if "try_count" in evt:
        try_count = evt["try_count"]
    
      try_count = try_count + 1
      logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count))
      return {"job_id": job_id, "job_status":status, "try_count":try_count}         

步骤2:创建Serverless工作流流程

该流程的主要逻辑描述如下:

  1. StartJob步骤:调用StartJob函数开始一个任务。
  2. Wait10s步骤:等待10秒。
  3. GetJobStatus步骤:调用GetJobStatus
  4. CheckJobComplete步骤:检查GetJobStatus函数返回的结果:
    • 如果返回"success"整个流程执行成功。
    • 如果轮询尝试次数大于3次,认为任务执行失败,流程执行失败。
    • 如果返回"running"则跳回到Wait10s步骤,继续执行。
    version: v1
    type: flow
    steps:
      - type: task
        name: StartJob
        resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/StartJob
      - type: pass
        name: Init
        outputMappings:
          - target: try_count
            source: 0
      - type: wait
        name: Wait10s
        duration: 10
      - type: task
        name: GetJobStatus
        resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/GetJobStatus
        inputMappings:
          - target: job_id
            source: $local.job_id
          - target: delay
            source: $input.delay
          - target: try_count
            source: $local.try_count
      - type: choice
        name: CheckJobComplete
        inputMappings:
          - target: status
            source: $local.job_status
          - target: try_count
            source: $local.try_count
        choices:
            - condition: $.status == "success"
              goto: JobSucceeded
            - condition: $.try_count > 3
              goto: JobFailed
            - condition: $.status == "running"
              goto: Wait10s
      - type: succeed
        name: JobSucceeded
      - type: fail
        name: JobFailed           

步骤3:开始执行并查看结果

在控制台创建好的流程中单击新执行并提供以下JSON对象作为输入,其中delay字段的值模拟任务完成需要的时间,这里预期任务在开始20秒后,GetJobStatus函数返回“success”,在此之前均返回“running”,您可以调整delay的值观察不同的执行结果。

{
  "delay": 20
}            
  • 下图展示的是轮询从开始到结束的流程执行可视化。Screen Shot 2019-06-26 at 12.30.01 PM
  • 下图展示的是任务需要20秒完成,可以看到流程执行历史中第一次GetJobStatus返回“running”因此CheckJobComplete的后续步骤眺回到Wait10s进行等待和下一次查询,第二次查询返回“success”,流程执行结束。Screen Shot 2019-06-26 at 12.39.26 PM