文档

自建Apache Kafka

更新时间:

本文介绍如何在事件总线EventBridge控制台添加自建Apache Kafka作为事件流中的事件提供方。

前提条件

Apache Kafka版本及配置要求

  • Apache Kafka版本需要大于0.10.0。

  • Apache Kafka服务端advertised.listeners参数需配置为IP形式,例如advertised.listeners=PLAINTEXT://192.168.XX.XX:9092

操作步骤

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板中,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

      1. Source(源)配置向导中,选择数据提供方自建服务 > Apache Kafka,配置以下参数,然后单击下一步

      参数

      说明

      示例

      接入点

      kafka集群broker接入点,由Broker的IP地址和端口号拼接而成,格式为{Broker的IP地址}:{端口号}

      192.0.XX.XX:9093,198.51.XX.XX:9093,203.0.XX.XX:9093

      Topic

      topic名称。

      testTopic

      Group ID

      订阅该Topic的消费者所对应的Group ID。

      说明

      Kafka消费组名称,请使用独立的GroupID来创建事件源,不要和已有的业务混用GroupID,以免影响已有的消息收发。

      GID_TEST

      网络配置

      选择网络配置。

      • 专有网络

      • 公网网络

      公网网络

      VPC

      选择VPC ID。

      vpc-bq1huohcvuo****

      交换机

      选择vSwitch ID。

      vsw-bqu1hdguoo****

      安全组

      选择实例所在的安全组。

      sg-dguigreuohpnv****

      认证模式

      • PLAINTEXT

      • SASL_PLAINTEXT

      选择认证模式。

      • PLAINTEXT

      • SASL_PLAINTEXT

        • 用户名:填写SASL用户名。

        • 密码:填写SASL密码。

        • Sasl鉴权方式:SASL认证机制。可选择PLAIN、SCRAM-SHA-256和SCRAM-SHA-512。

      PLAINTEXT

      消费位点

      选择开始消费消息的位点。

      • 最新位点

      • 最早位点

      最新位点

      数据格式

      数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。

      • Json(默认文本格式编码,二进制数据按照utf8编码为Json格式放入Payload)

      • Text(二进制数据按照utf8编码为字符串放入Payload)

      • Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload)

      Json(默认文本格式编码,二进制数据按照utf8编码为Json格式放入Payload)

      批量推送条数

      一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为[1,10000]。

      100

      批量推送间隔(单位:秒)

      调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位秒。0秒表示无等待时间,直接投递。

      3

      1. Filtering(过滤)Transform(转换)Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  4. 返回事件流页面,单击目标事件流名称,在目标事件流概览页面的右上角,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

事件示例

{
    "specversion": "1.0",
    "id": "8e215af8-ca18-4249-8645-f96c1026****",
    "source": "apachekafka",
    "type": "apachekafka:Topic:Message",
    "subject": "apachekafka:192.0.XX.XX:9093,198.51.XX.XX:9093,203.0.XX.XX:9093:topic:****",
    "datacontenttype": "application/json; charset=utf-8",
    "time": "2022-06-23T02:49:51.589Z",
    "aliyunaccountid": "182572506381****",
    "data": {
        "topic": "****",
        "partition": 7,
        "offset": 25,
        "timestamp": 1655952591589,
        "headers": {
            "headers": [
                {
                    "key": "head01",
                    "value": "this is my header"
                }
            ],
            "isReadOnly": false
        },
        "key": "keytest",
        "value": "hello kafka msg"
    }
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数

类型

示例

描述

topic

String

TopicName

Topic的名称。

partition

Int

1

Kafka消息的分区信息。

offse

Int

100

Kafka消息的位点信息。

timestamp

String

1655952591589

开始消费时间戳。

header

Object

Kafka消息的headers。

headers.headers

Array

Kafka消息的headers内容。

headers.isReadOnly

Boolean

false

标记Kafka消息的headers是否只读。

key

String

keytest

Kafka消息的key。

value

String

hello kafka msg

Kafka消息的value。