文档

Cluster Linking

更新时间:

Cluster Linking是Confluent Platform提供的一种功能,用于将多个Kafka集群连接在一起。该功能允许不同的Kafka集群之间进行数据的镜像和复制。Cluster Linking将在数据目标(Destination)集群启动,并复制数据源(Source)集群的数据到目标集群。本文将向您介绍如何使用云消息队列 Confluent 版的Cluster Linking。主要包括如何远程使用Confluent CLI客户端方式创建Cluster Linking以及Cluster Linking的基本管理。

前提条件

  • 已经准备好数据源集群和数据目标集群。

  • 安装了Confluent Platform 7.0.0及更高版本客户端,更多信息,请参见Confluent

  • 安装Java 1.8或1.11。更多信息,请参见安装1.8或以上版本JDK

配置客户端环境变量

本示例假设Source集群和Destination集群需要用SASL_SSL的方式登录集群,并且假设连接集群的时候,使用了证书进行域名校验。

  1. 将以下配置加入您的.bashrc或者.bash_profile配置文件。

    export CONFLUENT_HOME=<CP installation directory>
    export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka
    export PATH=${CONFLUENT_HOME}/bin:$PATH
    #Source集群的Bootstrap Server地址。
    export SOURCE_ADDRESS=<your source cluster access address>
    #Destination集群的Bootstrap Server地址。
    export DESTINATION_ADDRESS=<your destination cluster access address>
  2. 配置Source集群访问文件${CONFLUENT_CONFIG}/source.config

    bootstrap.servers=<your source cluster access address>
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
    sasl.mechanism=PLAIN
    ssl.truststore.location=<your source cluster truststore location>
    ssl.truststore.password=<your source cluster truststore password>
  3. 配置Destination集群访问文件${CONFLUENT_CONFIG}/destination.config

    bootstrap.servers=<your destination cluster access address>
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
    sasl.mechanism=PLAIN
    ssl.truststore.location=<your destination cluster truststore location>
    ssl.truststore.password=<your destination cluster truststore password>
  4. 配置Cluster Linking配置文件${CONFLUENT_CONFIG}/clusterlink.config

    bootstrap.servers=<your source cluster access address>
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
    sasl.mechanism=PLAIN
    ssl.truststore.type=PEM
    ssl.truststore.certificates=<your source cluster certificates content>

测试数据准备

使用Confluent Platform CLI执行相关命令,在Source集群上准备测试数据。

  1. 执行以下命令,在Source集群上创建一个具有单分区的待镜像的Topic,以便更容易观察复制消息的顺序。

    kafka-topics --create --topic demo-link-topic --partitions 1 --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config

    使用list topicdescribe topic命令查看Topic详情。

    #list topic
    kafka-topics --list --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
    #describe topic
    kafka-topics --describe --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
  2. 执行以下命令,向Source集群上的测试demo-link-topic发送消息。

    seq 1 5 | kafka-console-producer --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --producer.config ${CONFLUENT_CONFIG}/source.config
  3. 消费Source集群中demo-link-topic上的数据。

    kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${SOURCE_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/source.config

    如果成功消费消息,您的输出将是:

    1

    2

    3

    4

    5

数据同步

本章节描述如何设置和测试Cluster Linking。

  1. 执行以下命令,创建从Source集群到Destination集群的link:demo-link

    kafka-cluster-links --command-config $CONFLUENT_CONFIG/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --create --link demo-link --config-file ${CONFLUENT_CONFIG}/clusterlink.config
  2. 执行以下命令,检查link是否存在。

    kafka-cluster-links --command-config ${CONFLUENT_CONFIG}/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --list
  3. 执行以下命令,创建Mirror Topic。

    kafka-mirrors --create --mirror-topic demo-link-topic --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config $CONFLUENT_CONFIG/destination.config
  4. 执行以下命令,查看Mirror Topic。

    kafka-mirrors --describe --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
  5. 执行以下命令,在Destination集群消费Mirror Topic数据。

    kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${DESTINATION_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/destination.config

如果Topic成功消费消息,您的输出将是:

1

2

3

4

5

  1. 执行以下命令,查看Cluster Linking的复制状态。

    kafka-replica-status --topics demo-link-topic --include-linked --bootstrap-server ${DESTINATION_ADDRESS} --admin.config ${CONFLUENT_CONFIG}/destination.config

Cluster Linking管理

  1. 查看Cluster Linking列表,命令如下:

    kafka-cluster-links --list --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
  2. 查看Cluster Linking详情,命令如下:

    kafka-configs --describe --cluster-link demo-link --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
  3. Mirror Topic转换为普通Topic,命令如下:

    kafka-mirrors --promote --topics demo-link-topic --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config

    预期输出:

    Calculating max offset and ms lag for mirror topics: [demo-link-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [demo-link-topic]
    Request for stopping topic demo-link-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
  4. 删除Cluster Linking,命令如下:

    kafka-cluster-links --delete --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config

    预期输出:

    Cluster link 'demo-link' deletion successfully completed.

相关文档

  • 本页导读 (1)