本文介绍如何在事件总线EventBridge控制台添加自建Apache Kafka作为事件流中的事件提供方。
前提条件
您已部署Apache Kafka集群,并且在您的阿里云VPC内可以访问此Kafka消息服务。
Apache Kafka版本及配置要求
Apache Kafka版本需要大于0.10.0。
Apache Kafka服务端
advertised.listeners
参数需配置为IP形式,例如advertised.listeners=PLAINTEXT://192.168.XX.XX:9092
。
操作步骤
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流面板中,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导中,选择数据提供方为
,配置以下参数,然后单击下一步。
参数
说明
示例
接入点
kafka集群broker接入点,由Broker的IP地址和端口号拼接而成,格式为
{Broker的IP地址}:{端口号}
。192.0.XX.XX:9093,198.51.XX.XX:9093,203.0.XX.XX:9093
Topic
topic名称。
testTopic
Group ID
订阅该Topic的消费者所对应的Group ID。
说明Kafka消费组名称,请使用独立的GroupID来创建事件源,不要和已有的业务混用GroupID,以免影响已有的消息收发。
GID_TEST
网络配置
选择网络配置。
专有网络
公网网络
公网网络
VPC
选择VPC ID。
vpc-bq1huohcvuo****
交换机
选择vSwitch ID。
vsw-bqu1hdguoo****
安全组
选择实例所在的安全组。
sg-dguigreuohpnv****
认证模式
PLAINTEXT
SASL_PLAINTEXT
选择认证模式。
PLAINTEXT
SASL_PLAINTEXT
用户名:填写SASL用户名。
密码:填写SASL密码。
Sasl鉴权方式:SASL认证机制。可选择PLAIN、SCRAM-SHA-256和SCRAM-SHA-512。
PLAINTEXT
消费位点
选择开始消费消息的位点。
最新位点
最早位点
最新位点
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(默认文本格式编码,二进制数据按照utf8编码为Json格式放入Payload)
Text(二进制数据按照utf8编码为字符串放入Payload)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload)
Json(默认文本格式编码,二进制数据按照utf8编码为Json格式放入Payload)
批量推送条数
一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为[1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)、Transform(转换)及Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗。
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
返回事件流页面,单击目标事件流名称,在目标事件流概览页面的右上角,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
事件示例
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "apachekafka",
"type": "apachekafka:Topic:Message",
"subject": "apachekafka:192.0.XX.XX:9093,198.51.XX.XX:9093,203.0.XX.XX:9093:topic:****",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "182572506381****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [
{
"key": "head01",
"value": "this is my header"
}
],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}
CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
参数 | 类型 | 示例 | 描述 |
topic | String | TopicName | Topic的名称。 |
partition | Int | 1 | Kafka消息的分区信息。 |
offse | Int | 100 | Kafka消息的位点信息。 |
timestamp | String | 1655952591589 | 开始消费时间戳。 |
header | Object | Kafka消息的headers。 | |
headers.headers | Array | Kafka消息的headers内容。 | |
headers.isReadOnly | Boolean | false | 标记Kafka消息的headers是否只读。 |
key | String | keytest | Kafka消息的key。 |
value | String | hello kafka msg | Kafka消息的value。 |