跨集群复制数据

本文描述了如何启动两个Apache Kafka集群,然后启动一个Replicator进程在它们之间复制数据。

背景信息

自Confluent Platform 7.5起,ZooKeeper已不再用于新部署。Confluent建议新部署采用KRaft模式。要了解有关在KRaft模式下运行Kafka的更多信息,请参见KRaft OverviewPlatform Quick Start

本文提供了KRaft模式和ZooKeeper模式的示例。

对于KRaft,本示例展示了组合模式配置,其中每个集群的服务端和控制器都运行在同一台服务器上。目前,组合模式并不打算在生产中使用,为了简化教程,在此进行了展示。如果您想在单独的服务器上运行控制器和服务端,请在隔离模式下使用KRaft。如需了解更多信息,请参见KRaft OverviewKRaft mode

前提条件

  • 安装了Confluent Platform 更多信息,请参见Confluent

  • 安装Java 8、11或17。更多信息,请参见安装JDK

Kafka服务端和Confluent Platform组件的端口

KRaft mode

组件

Origin

Destination

Kafka brokers

9082

9092

KRaft controllers

9071

9093

Metadata server listeners (in brokers)

8091

8090

Connect Replicator worker

8083

Control Center

9021

ZooKeeper mode

组件

Origin

Destination

Kafka brokers

9082

9092

ZooKeeper

2171

2181

Metadata server listeners (in brokers)

8091

8090

Connect Replicator worker

8083

Control Center

9021

启动目标集群

KRaft mode

  1. 进入安装Confluent Platform的位置。

    cd $CONFLUENT_HOME
  2. 创建一个用于存放所有示例文件的目录。

    mkdir my-examples
  3. 将etc/kafka/kraft/server.properties复制到示例目录,并重命名。

    cp etc/kafka/kraft/server.properties my-examples/server_destination.properties
  4. 使用kafka-storage工具生成随机uuid。

    KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
    说明

    kafka-storage命令对每个代理、控制器只运行一次。不能使用此命令更新现有集群。如果此时在配置上出错,必须从头开始重新创建目录,并重新完成所有步骤。

  5. 格式化该服务器的日志目录。

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_destination.properties
  6. 启动目标服务器。

    ./bin/kafka-server-start my-examples/server_destination.properties

    有关启动和运行这些服务的详细信息,请参见Quick Start for Confluent Platform

Zookeeper mode

  1. 切换目录到Confluent Platform安装的位置。

    cd $CONFLUENT_HOME
  2. 创建一个目录,用于存放所有的示例文件。

    mkdir my-examples
  3. 复制etc/kafka/zookeeper.properties到示例目录,并重命名。

    cp etc/kafka/zookeeper.properties my-examples/zookeeper_destination.properties
  4. 复制etc/kafka/server.properties到示例目录,并重命名。

    cp etc/kafka/server.properties my-examples/server_destination.properties
  5. 启动一个ZooKeeper服务器。在本示例中,假设服务将在localhost上运行。通过在它自己的终端运行以下命令来启动ZooKeeper。

    ./bin/zookeeper-server-start my-examples/zookeeper_destination.properties
  6. 启动一个Kafka代理作为目的地的单节点Kafka集群,通过在它自己的终端运行以下命令来启动Kafka。

    ./bin/kafka-server-start my-examples/server_destination.properties                                          ./bin/kafka-server-start my-examples/server_destination.properties                                                            

启动源集群

在新的终端窗口中配置并启动源集群。

KRaft mode

说明

您需要在不同的端口上运行原始节点,以避免冲突。源节点上的Kafka代理配置在9082端口上,控制器配置在9071端口上,如上文端口映射所示。将配置文件复制到一个临时位置,并按照下面所示修改以防止与目的地集群发生冲突。

  1. 切换目录到Confluent Platform安装的位置。

    cd $CONFLUENT_HOME
  2. 将etc/kafka/kraft/server.properties复制到示例目录并重命名。

    cp etc/kafka/kraft/server.properties my-examples/server_origin.properties
  3. 更新端口号。

    sed -i '' -e "s/9093/9071/g" my-examples/server_origin.properties
    sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
    sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
    sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
    sed -i '' -e "s/confluent.metrics.reporter.bootstrap.servers=localhost:9092/confluent.metrics.reporter.bootstrap.servers=localhost:9082/g" my-examples/server_origin.properties
  4. 更新数据目录。

    sed -i '' -e "s/kraft-combined-logs/kraft-combined-logs-origin/g" my-examples/server_origin.properties
  5. 使用kafka-storage工具生成一个随机的UUID。

    KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
  6. 为这个服务器格式化日志目录。

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_origin.properties
  7. 接下来,启动一个Kafka代理,作为源Kafka集群服务。通过在它自己的终端运行以下命令来启动Kafka。

    ./bin/kafka-server-start my-examples/server_origin.properties

Zookeeper mode

说明

您需要在不同的端口上运行源集群,如端口映射所示,源集群中的Kafka代理配置在9082端口,ZooKeeper配置在2171端口。将配置文件复制到您的示例目录,并按下文所示进行修改,以防止冲突。

  1. 将配置文件复制到my-examples。

    cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties
    cp etc/kafka/server.properties my-examples/server_origin.properties
  2. 更新端口号。

    sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties
    sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
    sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties
    sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties
    sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
    sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
  3. 更新原始代理的Broker ID。

    sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.properties
  4. 更新数据目录。

    sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties
    sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.properties
  5. 启动原始集群。

    • 通过在它自己的终端运行以下命令来启动ZooKeeper。

      ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
    • 通过在它自己的终端运行以下命令来启动Kafka。

      ./bin/kafka-server-start my-examples/server_origin.properties

创建Topic

打开一个新的命令窗口来运行Kafka命令。

  1. 在原始集群中使用以下命令创建一个名为“test-topic”的主题。

    kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082
  2. 验证Topic是否被创建。

    kafka-topics --list --bootstrap-server localhost:9082

    预期输出(_confluent开头的Topic是内部Topic):

    __confluent.support.metrics
    _confluent-command
    test-topic

    当配置并运行Replicator时,“test-topic”将会被复制到目的地集群(在端口2181上)。

配置并运行Replicator

在$CONFLUENT_HOME/my-examples/中创建以下文件。

  1. 在名为consumer.properties的文件中配置源集群。

    cp etc/kafka/consumer.properties my-examples/.

    编辑该文件,确保其中包含源集群的代理地址。默认的代理列表将与您之前启动的源集群相匹配。

    # Origin cluster connection configuration
    bootstrap.servers=localhost:9082
  2. 在名为producer.properties的新文件中配置目标集群。

    cp etc/kafka/producer.properties my-examples/.

    编辑该文件,确保它包含目的地集群代理的地址。默认代理列表将与您之前启动的目的地集群相匹配。

    # Destination cluster connection configuration
    bootstrap.servers=localhost:9092
  3. 在名为replication.properties的新文件中为Connect Worker定义Replicator配置。

    # Replication configuration
    topic.rename.format=${topic}.replica
    replication.factor=1
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    confluent.topic.replication.factor=1
    说明
    • 如果replication.properties中未定义端口,则此Worker将在默认端口8083上运行。

    • 使用复制因子属性(全部设置为1)是因为这些测试集群规模较小。在生产环境中,建议的最小集群大小为3。

启动replicator

执行下面的命令在自己的终端启动Replicator可执行程序。

./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
  • --cluster.id:用于确定这个可执行文件应该加入哪个Replicator集群的标识符。拥有相同cluster.id的多个Replicator可执行实例将一起工作。

  • --consumer.config:源集群配置的路径。

  • --producer.config:目标集群配置的路径。

  • --replication.config:包含任何非连接特定配置的文件路径。命令行参数将覆盖这些配置。

  • --whitelist:从源集群复制到目标集群的Topic列表。

有关命令行选项的完整列表,请参见Command Line Parameters of Replicator Executable。 查看启动源任务和创建复制Topic相关的成功消息,这些消息表明Replicator已经启动并运行,并且正在复制Topic。

验证跨集群的Topic复制

Replicator完成初始化后,会检查源集群是否有需要复制的主题。

执行以下命令,检查test-topic.replica是否存在。

./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092

预期输出:

./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
Topic: test-topic.replica    PartitionCount: 1       ReplicationFactor: 1    Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824
      Topic: test-topic.replica      Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline: 0

您还可以列出并描述目标集群上的主题。例如 test-topic.replica。

./bin/kafka-topics --list --bootstrap-server localhost:9092
说明
  • 要列出原始集群上的主题,请针对localhost:9082运行kafka-topics --list。

  • 要查看原始主题的描述,请运行kafka-topics --describe,并寻找test-topic,目标指向localhost:9082。

在您在源集群中创建主题之后,您可以开始使用Kafka生产者向原始集群中的test-topic发送数据。然后,您可以通过从目标集群中的test-topic.replica消费来确认数据已被复制。例如,要使用Kafka的控制台生产者发送一系列数字,请在新的终端窗口中运行以下命令:

seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082

你可以使用控制台消费者在它自己的终端窗口中确认目的地集群的传送情况:

./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

如果消费者输出中出现了从1到10,000的数字,这表明您已经成功创建了多集群复制。

使用Control Center监控replicator

  1. 停止源集群和目标集群上的Replicator和代理服务器,然后停止ZooKeeper实例。

  2. 激活Replicator的监控扩展,更多信息,请参见Replicator Monitoring Extension

    • 将replicator-rest-extension-.jar的完整路径添加到你的CLASSPATH中。

    • 将rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension添加到my-examples/replication.properties中。

  3. 在目标集群和源集群的Kafka配置文件中取消注释或添加以下行,分别是my-examples/server_destination.properties和my-examples/server_origin.properties。confluent.metrics.reporter.bootstrap.servers的配置必须在两个文件中都指向端口9092的localhost,您需要编辑这两个端口号中的一个或两个。

    # 表明你的部署处于开发模式,使用的复制因子为1。
    confluent.metrics.reporter.topic.replicas=1
    # 在Control Center上启用了指标报告,并提供了访问Confluent内部Topic的权限,该Topic收集并存储监控数据。
    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=localhost:9092
  4. 编辑my-examples/producer.properties,为生产者添加监控拦截器。

    # Monitoring interceptor for producer
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
  5. 编辑my-examples/consumer.properties,为消费者添加监控拦截器。

    # Monitoring interceptor for consumer
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
  6. 编辑etc/confluent-control-center/control-center-dev.properties,添加以下两行,指定Control Center的源引导服务器和目标引导服务器。

    # multi-cluster monitoring
    confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082
    confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092
    说明
    • Control Center 需要 Connect REST 端点的主机和端口,才能知道在哪里查找 Replicator 监控指标。在本示例使用的配置文件(control-center-dev.properties)中,已为您配置了默认端口,因此开箱即用:

      # 以逗号分隔的连接主机名列表。
      confluent.controlcenter.connect.cluster=http://localhost:8083
    • 生产就绪配置文件(control-center-production.properties)的默认值已被注释掉。如果使用该文件、拥有多个连接程序或希望以不同方式配置连接集群,则必须指定连接端点,方法是取消注释默认值或为自己的连接集群指定主机。要了解更多信息,请参见Control Center Configuration Reference

    • 如果在部署中同时运行Replicator和Connect集群。必须分别指定:

      • Connect

        cluster: confluent.controlcenter.connect.<connect-cluster-name>.cluster=http://connect-host-1:8083
      • Replicator

        confluent.controlcenter.connect.<replicator-name>.cluster=http://replicator-host:8083
  7. 重新启动目标集群和原集群上的ZooKeeper实例。

    ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
    ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
  8. 重新启动目标集群和原集群上的Broker。

    ./bin/kafka-server-start my-examples/server_destination.properties
    ./bin/kafka-server-start my-examples/server_origin.properties
  9. 重新启动Replicator和Connect worker。

    ./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
  10. 使用以下命令启动Control Center。

    ./bin/control-center-start etc/confluent-control-center/control-center-dev.properties
  11. 在您的浏览器中打开Control Center,根据配置,集群会以自动生成的名称呈现在Control Center上。image

  12. (可选)在Control Center上,编辑集群名称以适应您的使用情况。

  13. 在Control Center选择目标集群,单击左侧导航栏的Replicators,然后使用Control Center监控replication性能,并深入查看源Topic和复制Topic。image

    要在Control Center查看向源Topic和复制Topic生成的消息,请在命令窗口中尝试使用kafka-consumer-perf-test,以自动向test-topic生成测试数据。

    kafka-producer-perf-test \
       --producer-props bootstrap.servers=localhost:9082 \
       --topic test-topic \
       --record-size 1000 \
       --throughput 1000 \
       --num-records 3600000

    预期输出:

    4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency.
    5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency.
    5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency.
    ...

    您可以使用kafka-console-consumer从命令行读取这些信息,以验证副本Topic是否接收到了这些信息。

    ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

    您也可以在Control Center上进行验证。image

要了解在Control Center监控复制器的更多信息,请参见Replicators” in Control Center User Guide

完成本教程的实验后,请务必按以下步骤进行清理:

  • 在每个命令窗口中使用Ctrl-C停止任何生产者和消费者。

  • 在每个命令窗口中使用Ctrl-C按启动服务的相反顺序停止每个服务(先停止Control Center,然后停止Replicator、Kafka Brokers,最后停止ZooKeepers)。