云消息队列 Kafka 版触发器(以下简称Kafka触发器)是通过事件总线EventBridge将云消息队列 Kafka 版作为事件源与函数计算进行集成。创建完成后,您可以在函数计算控制台和事件总线EventBridge控制台查看创建的信息。当有消息入队时,事件总线EventBridge会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理。
注意事项
作为触发源的云消息队列 Kafka 版实例必须和函数计算的函数在相同的地域。
创建的事件流数量超过上限后,将无法再创建Kafka触发器。关于事件流数量的限制,请参见使用限制。
前提条件
事件总线EventBridge
函数计算
云消息队列 Kafka 版
步骤一:创建Kafka触发器
- 登录函数计算控制台,在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域,然后在服务列表页面,单击目标服务。
在函数管理页面,单击目标函数名称。
在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器。
在创建触发器面板,填写相关信息。然后单击确定。
基础配置项说明如下所示。
配置项
操作
本文示例
触发器类型
选择消息队列 Kafka 版。
消息队列 Kafka 版
名称
填写自定义的触发器名称。
kafka-trigger
版本或别名
默认值为LATEST,如果您需要创建其他版本或别名的触发器,首先需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本和管理别名。
LATEST
Kafka 实例
选择已创建的云消息队列 Kafka 版实例。
alikafka_pre-cn-i7m2t7t1****
Topic
选择已创建的云消息队列 Kafka 版实例的Topic。
topic1
Group ID
选择已创建的云消息队列 Kafka 版实例的Group ID。
说明请使用独立的Group ID来创建触发器,不要与已有的业务混用Group ID,否则会影响已有的消息收发。
GID_group1
消费任务并发数
消费者的并发数量,取值范围为[1,Topic的分区数]。
2
消费位点
选择消息的消费位点,即云消息队列 Kafka 版从事件总线开始拉取消息的位置。
取值说明如下。
最早位点:从最早位点开始消费。
最新位点:从最新位点开始消费。
最新位点
调用方式
选择函数调用方式。
同步调用
投递并发最大值
Kafka消息投递到函数计算的并发最大值,取值范围为1~300。该参数仅对同步调用生效。如果需要更高的并发,请进入EventBridge配额中心申请配额名称为EventStreaming FC Sink 同步投递最大并发数的配额。
1
触发器启用状态
创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。
不涉及
关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能。
创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。
步骤二:配置函数入口参数
云消息队列 Kafka 版事件源会以event
的形式作为输入参数传递给函数,您可以手动将event
传给函数模拟触发事件。
在函数详情页面,单击函数代码页签,然后单击测试函数右侧图标,从下拉列表中,选择配置测试参数。
在配置测试参数面板,选择创建新测试事件或编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定。
event
格式如下所示:[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
参数
类型
示例值
描述
topic
String
TopicName
Topic的名称。
partition
Int
1
云消息队列 Kafka 版的消费分区信息。
offset
Int
0
云消息队列 Kafka 版的消息位点。
timestamp
String
1655952591589
开始消费时间戳。
步骤三:编写函数代码并测试
触发器创建完成后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当云消息队列 Kafka 版事件发生时,触发器会自动触发函数的执行。
在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码。
本文以Node.js函数代码为例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event参数,对event进行处理。 callback(null, 'return result'); }
单击函数代码页签的测试函数。
执行完成后,您可以在函数代码页签的上方查看执行结果。