文档

Cluster Linking

更新时间:
一键部署

本文将向您介绍如何使用流数据服务Confluent的Cluster Linking。内容主要包括如何远程用Confluent CLI客户端方式创建Cluster Linking以及Cluster Linking的基本管理。查看哪些集群可以使用Cluster Linking,请参阅支持的集群类型

前提条件

  • 您已经准备好满足Cluster Linking使用版本条件的Source集群和Destination集群。Cluster Linking将在Destination集群启动,并复制Source集群的数据到Destination集群。

  • 本文假设Source集群和Destination集群需要用SASL_SSL的方式登录集群,并且假设连接集群的时候,使用了证书进行域名校验。

  • 您在本地安装了Confluent Platform 7.0.0 或更高版本作为客户端,以及 Java 1.8 或 1.11。您可以参考

    部署文档来下载安装Confluent Platform软件包。

Cluster Linking创建

一、环境配置

  1. 配置客户端环境变量

将以下配置加入您的.bashrc或者.bash_profile配置文件里

#CP客户端安装位置
export CONFLUENT_HOME=<CP installation directory>
export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka
export PATH=${CONFLUENT_HOME}/bin:$PATH
#Source集群的Bootstrap Server地址
export SOURCE_ADDRESS=<your source cluster access address>
#Destination集群的Bootstrap Server地址
export DESTINATION_ADDRESS=<your destination cluster access address>
  1. 配置Source集群访问文件${CONFLUENT_CONFIG}/source.config

bootstrap.servers=<your source cluster access address>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
sasl.mechanism=PLAIN
ssl.truststore.location=<your source cluster truststore location>
ssl.truststore.password=<your source cluster truststore password>
  1. 配置Destination集群访问文件${CONFLUENT_CONFIG}/destination.config

bootstrap.servers=<your destination cluster access address>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
sasl.mechanism=PLAIN
ssl.truststore.location=<your destination cluster truststore location>
ssl.truststore.password=<your destination cluster truststore password>
  1. 配置Cluster Linking配置文件${CONFLUENT_CONFIG}/clusterlink.config

bootstrap.servers=<your source cluster access address>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
sasl.mechanism=PLAIN
ssl.truststore.type=PEM
ssl.truststore.certificates=<your source cluster certificates content>

二、测试数据准备

使用Confluent Platform CLI执行相关命令,在Source集群上准备测试数据。

  1. 在Source集群上创建一个具有单分区的待镜像的topic,以便更容易观察复制消息的顺序。

kafka-topics --create --topic demo-link-topic --partitions 1 --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config

确认topic已经创建成功

#list topic
kafka-topics --list --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
#describe topic
kafka-topics --describe --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
  1. 向Source集群上的测试demo-link-topic发送消息

seq 1 5 | kafka-console-producer --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --producer.config ${CONFLUENT_CONFIG}/source.config
  1. 从Source集群上的demo-link-topic消费

kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${SOURCE_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/source.config

如果Topic成功消费消息,您的输出将是:

1

2

3

4

5

使用键盘命令 Ctrl+C终止kafka-console-consumer命令。

三、数据同步

本章节描述如何设置和测试Cluster Linking。

  1. 创建从Source集群到Destination集群的link:demo-link。

kafka-cluster-links --command-config $CONFLUENT_CONFIG/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --create --link demo-link --config-file ${CONFLUENT_CONFIG}/clusterlink.config
  1. 使用命令检查link是否存在

kafka-cluster-links --command-config ${CONFLUENT_CONFIG}/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --list
  1. 创建Mirror Topic

kafka-mirrors --create --mirror-topic demo-link-topic --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config $CONFLUENT_CONFIG/destination.config
  1. 查看Mirror Topic

kafka-mirrors --describe --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
  1. 在Destination集群消费mirror topic数据

kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${DESTINATION_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/destination.config

如果Topic成功消费消息,您的输出将是:

1

2

3

4

5

使用键盘命令 Ctrl+C终止kafka-console-consumer命令。

  1. 查看Cluster Linking的复制状态

kafka-replica-status --topics demo-link-topic --include-linked --bootstrap-server ${DESTINATION_ADDRESS} --admin.config ${CONFLUENT_CONFIG}/destination.config

四、Cluster Linking管理

  1. Cluster Linking列表

kafka-cluster-links --list --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
  1. Cluster Linking详情

kafka-configs --describe --cluster-link demo-link --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
  1. Mirror Topic转换为普通Topic

kafka-mirrors --promote --topics demo-link-topic --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config

输出结果:

Calculating max offset and ms lag for mirror topics: [demo-link-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [demo-link-topic]
Request for stopping topic demo-link-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
  1. Cluster Linking删除

kafka-cluster-links --delete --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config

输出结果:

Cluster link 'demo-link' deletion successfully completed.

参考资料

  • 本页导读 (0)