本文介绍如何在事件总线EventBridge控制台添加消息队列Kafka版类型的自定义事件源。

前提条件

操作步骤

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线
  2. 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
  3. 在左侧导航栏,单击事件源
  4. 事件源页面,单击添加事件源
  5. 添加自定义事件源面板,输入名称描述,将事件提供方选择为消息队列Kafka版,配置以下参数,然后单击确认
    参数说明示例
    Kafka实例选择前提条件中已创建的消息队列Kafka版实例。alikafka_post-cn-20p31hkm****
    Topic选择当前实例中的Topic。test_topic
    Group ID
    • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
    • 使用已有:选择当前实例中已创建的Group,请不要与已有业务的Group混用,以免影响已有的消息收发。
    test_group
    并发配额(消费者数)当前Group中的Consumer数量。1
    消费位点开始消费的位置。
    • 最新位点:从最新位点开始消费。
    • 最早位点:从最初位点开始消费。
    最新位点
    网络配置事件总线EventBridge集成消息队列Kafka版数据时支持的网络连接方式。
    • 默认网络
    • 自建公网
    默认网络
    专有网络VPC选择VPC ID。当网络配置选择为自建公网时需要配置此参数。vpc-bp17fapfdj0dwzjkd****
    交换机选择vSwitch ID。当网络配置选择为自建公网时需要配置此参数。vsw-bp1gbjhj53hdjdkg****
    安全组选择安全组。当网络配置选择为自建公网时需要配置此参数。vpc-bp1syi9jrmx3x****

事件示例

{
    "id":"94ebc15f-f0db-4bbe-acce-sf6778fs****",
    "source":"acs:alikafka",
    "specversion":"1.0",
    "type":"alikafka:Topic:SendMessage",
    "datacontenttype":"application/json; charset=utf-8",
    "subject":"acs:alikafka:cn-hangzhou:123456789098****:topics/TopicName",
    "time":"2022-04-08T06:01:20.766Z",
    "aliyunpublishtime":"2022-04-08T06:01:20.725Z",
    "aliyuneventbusname":"BusName",
    "data":{
        "InstanceId":"alikafka_post-cn-****",
        "ConsumerGroup":"GID-test-group",
        "Topic":"test-topic",
        "RegionId":"cn-hangzhou",
        "OffsetReset":"earliest",
        "MaximumTasks":1,
        "Network":"PublicNetwork",
        "SecurityGroupId":"vpc-bp1syi9jrmx3x****",
        "VpcId":"vpc-bp1syi9jrmx3xwnw****",
        "VSwitchIds":"vsw-bp1q0tnb71j8qzxg****"
    }
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数类型示例值描述
InstanceIdStringalikafka_post-cn-****消息队列Kafka版实例ID。
ConsumerGroupStringGID-test-groupGroup ID。
TopicStringtest-topicTopic名称。
RegionIdStringcn-hangzhou消息队列Kafka版实例所在的地域。
OffsetResetStringearliest消费位点。
MaximumTasksString1并发配额(消费者数量)。
NetworkStringPublicNetwork网络配置。取值如下:
  • Default:默认网络。
  • PublicNetwork:自建公网。
SecurityGroupIdStringvpc-bp1syi9jrmx3x****安全组ID。
VpcIdStringvpc-bp1syi9jrmx3xwnw****专有网络ID。
VSwitchIdsStringvsw-bp1q0tnb71j8qzxg****交换机ID。