本文介绍如何在事件总线EventBridge控制台添加云消息队列 Kafka 版作为事件流中的事件提供方。
前提条件
-
您已购买并部署云消息队列 Kafka 版实例,且实例处于服务中状态。具体步骤,请参见购买和部署实例。
操作步骤
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
-
在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。
-
任务创建
-
在Source (源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
-
在Filtering(过滤)、Transform(转换)及Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗。
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
MQ_INST_115964845466****_ByBeUp3p
Topic
选择生产云消息队列 Kafka 版消息的Topic。
topic
Group ID
选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
GID_http_1
消费位点
选择开始消费消息的位点。
最新位点
网络配置
选择路由消息的网络类型。
默认网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。
例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。
开启
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
-
-
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
-
-
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
事件示例
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "182572506381****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}
CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
|
参数 |
类型 |
示例值 |
描述 |
|
topic |
String |
TopicName |
Topic的名称。 |
|
partition |
Int |
1 |
云消息队列 Kafka 版的消费分区信息。 |
|
offset |
Int |
0 |
云消息队列 Kafka 版的消息位点。 |
|
timestamp |
String |
1655952591589 |
开始消费时间戳。 |
|
headers.headers |
List |
[header1, header2] |
消息 header |
|
headers.isReadOnly |
Boolean |
false |
此字段仅保留,无实际意义 |
|
key |
String |
dataKey |
消息 key |
|
value |
String |
dataValue |
消息 value。具体内容格式和任务配置的数据格式相关。
|