文档

通过开源Kafka命令行工具访问消息引擎

更新时间:

Lindorm消息引擎完全兼容开源Kafka API,您可以通过开源Kafka命令行工具,实现基础的Topic或Group的信息查看与运维管理。本文介绍如何通过开源Kafka命令行工具访问消息引擎。

前提条件

  • 已安装Java环境,要求JDK为1.8及以上版本。

  • 已将客户端IP添加至Lindorm白名单。如何添加,请参见设置白名单

注意事项

  • Lindorm消息引擎仅支持通过专有网络访问

  • 如果您的应用部署在ECS实例,通过专有网络访问Lindorm实例前,需确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

    • 所在地域相同,并建议所在可用区相同(以减少网络延时)。

    • ECS实例与Lindorm实例属于同一专有网络。

操作步骤

以Linux操作系统为例。

下载命令行工具

  1. 下载开源Kafka命令行工具。

    wget 'https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/kafka_2.12-2.7.1.tgz'
  2. 解压Kafka命令行工具。

    tar -xvf kafka_2.12-2.7.1.tgz
  3. 跳转至解压后的目录下。

    cd kafka_2.12-2.7.1

查看与管理消息表

假设Topic名称为test_log

  • 创建Topic。

    bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --create --topic <topic名称> --partitions 100

    Lindorm消息引擎Kafka地址的获取方式,请参见访问实例

    执行成功将返回以下结果:

    Created topic test_log.
  • 查看Topics列表。

    bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --list

    返回结果:

    __consumer_offsets
    __transaction_state
    _schemas
    ksql__commands
    test_log
  • 查看Topic详情。

    bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --describe --topic <topic名称>

    返回结果将根据创建时设置的partitions的值逐条显示。

  • 删除已有Topic。

    bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --delete --topic <topic名称>

    可使用查看Topics列表的语句查询是否已成功删除。

读写数据

  1. 写入数据。

    • 逐条写入数据。

      执行以下命令后,逐条将数据写入Kafka Topic中。

      bin/kafka-console-producer.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --topic <topic名称>
    • 以文件形式写入数据。

      将数据保存为本地文件,执行以下命令将标准数据流转发至对应Topic。

      cat <本地文件> | bin/kafka-console-producer.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --topic <topic名称>
  2. 持续读取对应Topic中的新增数据。您可以随时使用组合键Ctrl+C中断数据读取。

    bin/kafka-console-consumer.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --describe --topic <topic名称> [--from-beginning]

    其中,--from-beginning为可选参数,如果语句中携带此参数,表示从最早的数据开始读起。

查看消费组信息

读取数据库,系统会自动创建一个消费组,您可以通过以下语句,查看消费组信息。

  • 查看消费组列表。

    bin/kafka-consumer-groups.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --list
  • 查看消费组详情。

    bin/kafka-consumer-groups.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --describe --group <group名称>
  • 本页导读 (1)