Lindorm消息引擎完全兼容开源Kafka API,您可以通过开源Kafka命令行工具,实现基础的Topic或Group的信息查看与运维管理。本文介绍如何通过开源Kafka命令行工具访问消息引擎。
前提条件
已安装Java环境,要求JDK为1.8及以上版本。
已将客户端IP添加至Lindorm白名单。如何添加,请参见设置白名单。
注意事项
Lindorm消息引擎仅支持通过专有网络访问。
如果您的应用部署在ECS实例,通过专有网络访问Lindorm实例前,需确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
所在地域相同,并建议所在可用区相同(以减少网络延时)。
ECS实例与Lindorm实例属于同一专有网络。
操作步骤
以Linux操作系统为例。
下载命令行工具
下载开源Kafka命令行工具。
wget 'https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/kafka_2.12-2.7.1.tgz'
解压Kafka命令行工具。
tar -xvf kafka_2.12-2.7.1.tgz
跳转至解压后的目录下。
cd kafka_2.12-2.7.1
查看与管理消息表
假设Topic名称为test_log
。
创建Topic。
bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --create --topic <topic名称> --partitions 100
Lindorm消息引擎Kafka地址的获取方式,请参见访问实例。
执行成功将返回以下结果:
Created topic test_log.
查看Topics列表。
bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --list
返回结果:
__consumer_offsets __transaction_state _schemas ksql__commands test_log
查看Topic详情。
bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --describe --topic <topic名称>
返回结果将根据创建时设置的partitions的值逐条显示。
删除已有Topic。
bin/kafka-topics.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --delete --topic <topic名称>
可使用查看Topics列表的语句查询是否已成功删除。
读写数据
写入数据。
逐条写入数据。
执行以下命令后,逐条将数据写入Kafka Topic中。
bin/kafka-console-producer.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --topic <topic名称>
以文件形式写入数据。
将数据保存为本地文件,执行以下命令将标准数据流转发至对应Topic。
cat <本地文件> | bin/kafka-console-producer.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --topic <topic名称>
持续读取对应Topic中的新增数据。您可以随时使用组合键Ctrl+C中断数据读取。
bin/kafka-console-consumer.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --describe --topic <topic名称> [--from-beginning]
其中,--from-beginning为可选参数,如果语句中携带此参数,表示从最早的数据开始读起。
查看消费组信息
读取数据库,系统会自动创建一个消费组,您可以通过以下语句,查看消费组信息。
查看消费组列表。
bin/kafka-consumer-groups.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --list
查看消费组详情。
bin/kafka-consumer-groups.sh --bootstrap-server <Lindorm消息引擎Kafka地址> --describe --group <group名称>
反馈
- 本页导读 (1)
文档反馈