消息队列RocketMQ版作为事件源通过事件总线EventBridge与函数计算集成后,通过消息队列RocketMQ版触发器(以下简称RocketMQ触发器)能够触发关联函数执行,通过函数可以对发布到消息队列RocketMQ版中的消息进行自定义处理。本文介绍如何在函数计算控制台创建RocketMQ触发器、配置函数入口参数和编写代码并测试。
功能简介
您在函数计算的控制台提交触发器创建请求之后,函数计算会根据触发器的配置信息,自动创建事件总线EventBridge相关的资源。目前提供事件模式和事件流模式两种消息推送模式,每个模式创建的资源如下:
注意事项
- 作为触发源的消息队列RocketMQ版的实例必须和函数计算的函数在相同的地域。
- 创建的自定义总线以及事件规则的数量超过上限后,将无法再创建事件模式的RocketMQ触发器。
- 创建的事件流数量超过上限后,将无法再创建事件流模式的RocketMQ触发器。
在单个阿里云账号单个地域维度下,关于创建触发器涉及的资源数量的限制,请参见使用限制。
前提条件
- 事件总线EventBridge
- 函数计算
- 消息队列RocketMQ版
步骤一:创建触发器
- 登录函数计算控制台,在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域,然后在服务列表页面,单击目标服务。
- 在函数管理页面,单击目标函数名称。
- 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器。
- 在创建触发器面板,填写相关信息。然后单击确定。基础配置项说明如下所示。
配置项 操作 本文示例 触发器类型 选择消息队列 RocketMQ 版。 消息队列 RocketMQ 版 名称 填写自定义的触发器名称。 rocketmq-trigger 版本或别名 默认值为LATEST,如果您需要创建其他版本或别名的触发器,首先需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本和管理别名。 LATEST RocketMQ 实例 选择已创建的消息队列RocketMQ版的实例。 MQ_INST_164901546557****_BX7**** Topic 选择已创建的消息队列RocketMQ版实例的Topic。 topic1 Tag 填写消息过滤标签。 只有收到包含此处设置的过滤标签字符串的消息时,才会触发函数执行。
tag Group ID 选择已创建的消息队列RocketMQ版实例的Group ID。 GID_group1 消费位点 选择消息的消费位点,即消息队列RocketMQ版从事件总线开始拉取消息的位置。取值说明如下。 - 最新位点:从最新位点开始消费。
- 最早位点:从最早位点开始消费。
- 指定时间戳:从指定时间戳开始消费。
最新位点 调用方式 选择函数调用方式。 同步调用 消息推送模式 消息数据推送到函数计算时的底层应用模式。 取值说明如下。- 事件流模式:会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理,适合端到端的流式数据处理场景。
- 事件模式:每次会将单个消息作为事件参数传入函数中,事件遵循CloudEvents规范。消息内容和CloudEvents的关系,请参见步骤二:配置函数入口参数。
事件模 触发器启用状态 创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。 不涉及 关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能。
创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。
步骤二:配置函数入口参数
消息队列RocketMQ版事件源会以event
的形式作为输入参数传递给函数,您可以手动将event
传给函数模拟触发事件,测试函数代码是否正确。
- 在函数详情页面,单击函数代码页签,然后单击图标,从下拉列表中,选择配置测试参数。
- 在配置测试参数面板,选择创建新测试事件或编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定。事件模式的
event
格式如下所示。{ "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } }
事件流模式的event
格式如下所示。[ { "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } }, { "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } } ]
data字段包含的参数解释如下表所示。关于CloudEvents规范中定义的参数解释,请参见事件概述。参数 类型 示例值 描述 topic String TopicName Topic名称。 systemProperties Map 系统属性。 MIN_OFFSET Int 0 最低位点。 TRACE_ON Boolean true 是否有消息轨迹。取值说明如下: - true:有消息轨迹。
- false:无消息轨迹。
MAX_OFFSET Int 8 最高位点。 MSG_REGION String cn-hangzhou 发送消息的地域。 KEYS String systemProperties.KEYS 过滤属性。 CONSUME_START_TIME Long 1628577790396 开始消费时间。单位:毫秒。 UNIQ_KEY String AC14C305069E1B28CDFA3181CDA2**** 消息唯一键。 TAGS String systemProperties.TAGS 过滤属性。 INSTANCE_ID String MQ_INST_123456789098****_BXhFHryi 实例ID。 userProperties Map 无 用户属性。 body String TEST 消息内容。
步骤三:编写函数代码并测试
完成触发器创建后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当消息队列RocketMQ版事件通过事件总线EventBridge投递到函数计算时,触发器会自动触发函数的执行。
- 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码。本文以Node.js函数代码为例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event参数,对event进行处理。 callback(null, 'return result'); }
- 单击函数代码页签的测试函数。执行完成后,您可以在函数代码页签的上方查看执行结果。
更多信息
如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。