Cluster Linking是Confluent Platform提供的一种功能,用于将多个Kafka集群连接在一起。该功能允许不同的Kafka集群之间进行数据的镜像和复制。Cluster Linking将在数据目标(Destination)集群启动,并复制数据源(Source)集群的数据到目标集群。本文将向您介绍如何使用云消息队列 Confluent 版的Cluster Linking。主要包括如何远程使用Confluent CLI客户端方式创建Cluster Linking以及Cluster Linking的基本管理。
前提条件
已经准备好数据源集群和数据目标集群。
安装了Confluent Platform 7.0.0及更高版本客户端,更多信息,请参见Confluent。
安装Java 1.8或1.11。更多信息,请参见安装1.8或以上版本JDK。
配置客户端环境变量
本示例假设Source集群和Destination集群需要用SASL_SSL的方式登录集群,并且假设连接集群的时候,使用了证书进行域名校验。
将以下配置加入您的.bashrc或者.bash_profile配置文件。
export CONFLUENT_HOME=<CP installation directory> export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka export PATH=${CONFLUENT_HOME}/bin:$PATH #Source集群的Bootstrap Server地址。 export SOURCE_ADDRESS=<your source cluster access address> #Destination集群的Bootstrap Server地址。 export DESTINATION_ADDRESS=<your destination cluster access address>
配置Source集群访问文件
${CONFLUENT_CONFIG}/source.config
。bootstrap.servers=<your source cluster access address> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>'; sasl.mechanism=PLAIN ssl.truststore.location=<your source cluster truststore location> ssl.truststore.password=<your source cluster truststore password>
配置Destination集群访问文件
${CONFLUENT_CONFIG}/destination.config
。bootstrap.servers=<your destination cluster access address> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>'; sasl.mechanism=PLAIN ssl.truststore.location=<your destination cluster truststore location> ssl.truststore.password=<your destination cluster truststore password>
配置Cluster Linking配置文件
${CONFLUENT_CONFIG}/clusterlink.config
。bootstrap.servers=<your source cluster access address> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>'; sasl.mechanism=PLAIN ssl.truststore.type=PEM ssl.truststore.certificates=<your source cluster certificates content>
测试数据准备
使用Confluent Platform CLI执行相关命令,在Source集群上准备测试数据。
执行以下命令,在Source集群上创建一个具有单分区的待镜像的Topic,以便更容易观察复制消息的顺序。
kafka-topics --create --topic demo-link-topic --partitions 1 --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
使用
list topic
和describe topic
命令查看Topic详情。#list topic kafka-topics --list --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config #describe topic kafka-topics --describe --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
执行以下命令,向Source集群上的测试demo-link-topic发送消息。
seq 1 5 | kafka-console-producer --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --producer.config ${CONFLUENT_CONFIG}/source.config
消费Source集群中demo-link-topic上的数据。
kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${SOURCE_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/source.config
如果成功消费消息,您的输出将是:
1
2
3
4
5
数据同步
本章节描述如何设置和测试Cluster Linking。
执行以下命令,创建从Source集群到Destination集群的link:
demo-link
。kafka-cluster-links --command-config $CONFLUENT_CONFIG/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --create --link demo-link --config-file ${CONFLUENT_CONFIG}/clusterlink.config
执行以下命令,检查link是否存在。
kafka-cluster-links --command-config ${CONFLUENT_CONFIG}/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --list
执行以下命令,创建Mirror Topic。
kafka-mirrors --create --mirror-topic demo-link-topic --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config $CONFLUENT_CONFIG/destination.config
执行以下命令,查看Mirror Topic。
kafka-mirrors --describe --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
执行以下命令,在Destination集群消费Mirror Topic数据。
kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${DESTINATION_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/destination.config
如果Topic成功消费消息,您的输出将是:
1 2 3 4 5 |
执行以下命令,查看Cluster Linking的复制状态。
kafka-replica-status --topics demo-link-topic --include-linked --bootstrap-server ${DESTINATION_ADDRESS} --admin.config ${CONFLUENT_CONFIG}/destination.config
Cluster Linking管理
查看Cluster Linking列表,命令如下:
kafka-cluster-links --list --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
查看Cluster Linking详情,命令如下:
kafka-configs --describe --cluster-link demo-link --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
Mirror Topic转换为普通Topic,命令如下:
kafka-mirrors --promote --topics demo-link-topic --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
预期输出:
Calculating max offset and ms lag for mirror topics: [demo-link-topic] Finished calculating max offset lag and max lag ms for mirror topics: [demo-link-topic] Request for stopping topic demo-link-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
删除Cluster Linking,命令如下:
kafka-cluster-links --delete --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
预期输出:
Cluster link 'demo-link' deletion successfully completed.
相关文档
关于Cluster Linking的更多信息,请参见Cluster Linking for Confluent Platform。
查看集群是否可以使用Cluster Linking,请参见支持的集群类型。
- 本页导读 (1)