本文介绍了如何使用任务步骤的等待回调(waitForCallback)模式集成轻量消息队列(原 MNS)主题,并发布消息到主题。轻量消息队列(原 MNS)主题接收到消息后,调用工作流ReportTaskSucceeded或ReportTaskFailed API回调任务状态。
框架原理
应用部署后执行流程如下:
执行工作流,任务步骤发布消息到MNS主题。任务步骤的
TaskToken
会被放入消息体一起发送到主题。工作流任务步骤暂停执行,等待任务回调。
轻量消息队列(原 MNS)主题接收到消息后,将消息和
TaskToken
通过HTTP推送发送到函数计算FC的函数HTTP触发器,触发函数执行。函数计算函数最终获取到
TaskToken
,并调用ReportTaskSucceeded - 汇报指定的任务执行成功来报告任务状态。流程继续执行。
部署应用
在流程页面,单击创建流程。
在创建流程页面,选择 模板,单击下一步。
在创建应用页面,配置相关信息,创建模板对应的应用,并单击部署。
应用名称:自定义参数,同一账号下必须唯一。
TopicName:自定义参数,如果对应轻量消息队列(原 MNS)主题不存在会自动创建。
单击部署后,会显示应用下创建的所有资源。
执行工作流。
执行以下命令。
{ "messageBody": "hello world" }
执行成功后,您可以看到执行结果的状态。
应用代码
编排轻量消息队列(原 MNS)主题的工作流。
将任务步骤回调的
TaskToken
封装在消息的MessageBody
中,用于后续的回调。outputMappings
中读取ReportTaskSucceeded设置的output
。version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/<topic>/messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $
回调任务步骤的FC函数。
读取
MessageBody
中封装的TaskToken
,回调任务状态设置output
为{"status":"success"}
。def handler(environ, start_response): # Get request body try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body)) body = json.loads(request_body) message_body_str = body['Message'] # Read MessageBody and TaskToken from message body message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body)) # Init fnf client use sts token context = environ['fc.context'] creds = context.credentials sts_creds = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region) # Report task succeeded to serverless workflow req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp)) # Response to http request status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [b'OK']
更多信息
任务步骤编排轻量消息队列(原 MNS)主题应用的更多信息,请参见task-mns-topics应用代码。