文档

云消息队列 Kafka 版

更新时间:

本文介绍如何在事件总线EventBridge控制台添加云消息队列 Kafka 版作为事件流中的事件提供方。

前提条件

操作步骤

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

      2. 参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        topic

        Group ID

        选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。

        GID_http_1

        消费位点

        选择开始消费消息的位点。

        最新位点

        网络配置

        选择路由消息的网络类型。

        默认网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        批量推送

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。

        例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。

        开启

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      3. Filtering(过滤)Transform(转换)Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  4. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

事件示例

{
  "specversion": "1.0",
  "id": "8e215af8-ca18-4249-8645-f96c1026****",
  "source": "acs:alikafka",
  "type": "alikafka:Topic:Message",
  "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-23T02:49:51.589Z",
  "aliyunaccountid": "182572506381****",
  "data": {
    "topic": "****",
    "partition": 7,
    "offset": 25,
    "timestamp": 1655952591589,
    "headers": {
      "headers": [],
      "isReadOnly": false
    },
    "key": "keytest",
    "value": "hello kafka msg"
  }
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数

类型

示例值

描述

topic

String

TopicName

Topic的名称。

partition

Int

1

云消息队列 Kafka 版的消费分区信息。

offset

Int

0

云消息队列 Kafka 版的消息位点。

timestamp

String

1655952591589

开始消费时间戳。