Cluster Linking
本文将向您介绍如何使用流数据服务Confluent的Cluster Linking。内容主要包括如何远程用Confluent CLI客户端方式创建Cluster Linking以及Cluster Linking的基本管理。查看哪些集群可以使用Cluster Linking,请参阅支持的集群类型。
前提条件
您已经准备好满足Cluster Linking使用版本条件的Source集群和Destination集群。Cluster Linking将在Destination集群启动,并复制Source集群的数据到Destination集群。
本文假设Source集群和Destination集群需要用SASL_SSL的方式登录集群,并且假设连接集群的时候,使用了证书进行域名校验。
您在本地安装了Confluent Platform 7.0.0 或更高版本作为客户端,以及 Java 1.8 或 1.11。您可以参考
部署文档来下载安装Confluent Platform软件包。
Cluster Linking创建
一、环境配置
配置客户端环境变量
将以下配置加入您的.bashrc或者.bash_profile配置文件里
#CP客户端安装位置
export CONFLUENT_HOME=<CP installation directory>
export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka
export PATH=${CONFLUENT_HOME}/bin:$PATH
#Source集群的Bootstrap Server地址
export SOURCE_ADDRESS=<your source cluster access address>
#Destination集群的Bootstrap Server地址
export DESTINATION_ADDRESS=<your destination cluster access address>
配置Source集群访问文件${CONFLUENT_CONFIG}/source.config
bootstrap.servers=<your source cluster access address>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>';
sasl.mechanism=PLAIN
ssl.truststore.location=<your source cluster truststore location>
ssl.truststore.password=<your source cluster truststore password>
配置Destination集群访问文件${CONFLUENT_CONFIG}/destination.config
bootstrap.servers=<your destination cluster access address>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>';
sasl.mechanism=PLAIN
ssl.truststore.location=<your destination cluster truststore location>
ssl.truststore.password=<your destination cluster truststore password>
配置Cluster Linking配置文件${CONFLUENT_CONFIG}/clusterlink.config
bootstrap.servers=<your source cluster access address>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>';
sasl.mechanism=PLAIN
ssl.truststore.type=PEM
ssl.truststore.certificates=<your source cluster certificates content>
二、测试数据准备
使用Confluent Platform CLI执行相关命令,在Source集群上准备测试数据。
在Source集群上创建一个具有单分区的待镜像的topic,以便更容易观察复制消息的顺序。
kafka-topics --create --topic demo-link-topic --partitions 1 --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
确认topic已经创建成功
#list topic
kafka-topics --list --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
#describe topic
kafka-topics --describe --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
向Source集群上的测试demo-link-topic发送消息
seq 1 5 | kafka-console-producer --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --producer.config ${CONFLUENT_CONFIG}/source.config
从Source集群上的demo-link-topic消费
kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${SOURCE_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/source.config
如果Topic成功消费消息,您的输出将是:
1 2 3 4 5 |
使用键盘命令 Ctrl+C终止kafka-console-consumer命令。
三、数据同步
本章节描述如何设置和测试Cluster Linking。
创建从Source集群到Destination集群的link:demo-link。
kafka-cluster-links --command-config $CONFLUENT_CONFIG/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --create --link demo-link --config-file ${CONFLUENT_CONFIG}/clusterlink.config
使用命令检查link是否存在
kafka-cluster-links --command-config ${CONFLUENT_CONFIG}/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --list
创建Mirror Topic
kafka-mirrors --create --mirror-topic demo-link-topic --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config $CONFLUENT_CONFIG/destination.config
查看Mirror Topic
kafka-mirrors --describe --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
在Destination集群消费mirror topic数据
kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${DESTINATION_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/destination.config
如果Topic成功消费消息,您的输出将是:
1 2 3 4 5 |
使用键盘命令 Ctrl+C终止kafka-console-consumer命令。
查看Cluster Linking的复制状态
kafka-replica-status --topics demo-link-topic --include-linked --bootstrap-server ${DESTINATION_ADDRESS} --admin.config ${CONFLUENT_CONFIG}/destination.config
四、Cluster Linking管理
Cluster Linking列表
kafka-cluster-links --list --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
Cluster Linking详情
kafka-configs --describe --cluster-link demo-link --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
Mirror Topic转换为普通Topic
kafka-mirrors --promote --topics demo-link-topic --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
输出结果:
Calculating max offset and ms lag for mirror topics: [demo-link-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [demo-link-topic]
Request for stopping topic demo-link-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
Cluster Linking删除
kafka-cluster-links --delete --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
输出结果:
Cluster link 'demo-link' deletion successfully completed.
参考资料
- 本页导读 (0)