Callbacks for asynchronous tasks

更新时间:
复制 MD 格式

Callbacks in Serverless workflow reduce latency and server load compared to polling. Combined with queues, callbacks extend Serverless workflow orchestration beyond Function Compute to any compute resource.

Why use callbacks

Long-running tasks are typically submitted asynchronously and return a task ID. Two methods detect completion: polling (Poll for task status) and callbacks. Callbacks in Serverless workflow address these use cases:

  • Eliminate polling delays.
  • Reduce server load from high-concurrency polling.
  • Orchestrate non-Function Compute tasks, such as processes on ECS instances or in on-premises data centers.
  • Automate manual steps, such as approval notifications.

The following diagram shows how Message Service (MNS) queues and the callback API orchestrate tasks on your own resources, extending Serverless workflow use cases.

fnf-doc-service-integration-mns-queue

How callbacks work

Set the pattern field to waitForCallback in a task step. As the state machine diagram below shows, the flow invokes the resource specified by resourceArn, generates a unique taskToken in the context object, and pauses. The step stays paused until Serverless workflow receives a callback or the task times out. Call ReportTaskSucceeded or ReportTaskFailed with the taskToken to resume.

fnf-doc-callback-state-machine
  - type: task
      name: mytask
      resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
      pattern: waitForCallback  # Specifies that the task step waits for a callback after submitting the task.
      inputMappings:
        - target: taskToken
          source: $context.task.token  # Passes the taskToken from the context object as input to the function specified by resourceArn.
      outputMappings:
        - target: k
          source: $local.key  # Maps the output {"key": "value"} from ReportTaskSucceeded to {"k": "value"} and uses it as the output of this step.         

Example: callback with Function Compute

This example has three steps:

  1. Prepare the task function
  2. Start the flow
  3. Send the callback

Step 1: Prepare the task function

Create a function that returns the input as-is.
  • Service: fnf-demo.
  • Function: echo.
  • Runtime environment: Python 2.7.
  • Entry point: index.handler.
#!/usr/bin/env python
import json
def handler(event, context):
    return event          

Step 2: Start the flow

Create and run the following flow.
  • Flow name: fnf-demo-callback.
  • Flow role: A role with permissions to invoke Function Compute.
version: v1
type: flow
steps:
  - type: task
    name: mytask
    resourceArn: acs:fc:::services/fnf-demo/functions/echo
    pattern: waitForCallback
    inputMappings:
      - target: taskToken
        source: $context.task.token
    outputMappings:
      - target: s
        source: $local.status         

After the flow starts, the mytask step pauses at the TaskSubmitted event and waits for a callback. The event output contains the taskToken that identifies the callback task.

Step 3: Send the callback

Run the callback.py script using the Serverless workflow Python SDK from any Python-capable environment. Replace {task-token} with the taskToken from the TaskSubmitted event output.
cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback
# In a virtual environment, install the Serverless workflow Python SDK.
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf
# Run the worker process.
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
                        
# worker.py code:
import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
def main():
  account_id = os.environ['ACCOUNT_ID']
  akid = os.environ['AK_ID']
  ak_secret = os.environ['AK_SECRET']
  fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")
  task_token = sys.argv[1]
  print "Task token: " + task_token
  try:
    request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
    request.set_Output("{\"status\": \"ok\"}")
    request.set_TaskToken(task_token)
    resp = fnf_client.do_action_with_exception(request)
    print "ReportTaskSucceeded finished"
  except ServerException as e:
    print(e)
if __name__ == '__main__':
    main()                      
After a successful callback, the mytask step resumes. outputMappings maps the {"status": "ok"} output from ReportTaskSucceeded to {"s": "ok"}.