使用 Kafka MirrorMaker 将自建 Kafka 集群(源集群)中的 Topic 数据迁移至阿里云消息队列 for Apache Kafka 集群(目标集群)。

背景信息

Kafka 的镜像特性可实现 Kafka 集群的数据备份。实现这一特性的工具就是 MirrorMaker,即您可使用 MirrorMaker 将源集群中的数据镜像拷贝到目标集群。关于 MirrorMaker 的详细介绍,请参见 Kafka mirroring(MirrorMaker)

本文中的源集群为您的自建 Kafka 集群,目标集群为阿里云消息队列 for Apache Kafka 集群。

准备工作

  1. 获取 Kafka MirrorMaker 工具。

    下载 Kafka,本文以 Kafka 2.2.0 版本为例,下载后并解压:

    tar -xzf kafka_2.12-2.2.0.tgz

    cd kafka_2.12-2.2.0

  2. 在阿里云消息队列 for Apache Kafka 中创建需要迁移的目标 Topic。具体步骤请参见步骤三:创建资源中的创建 Topic 的步骤。

    创建时请注意以下几点:

    • Topic 名称必须一致;

    • 分区(Partition)数量可以不一致;

    • 迁移后原先在同一个分区中的数据并不保证迁移到同一个分区中。默认情况下,有相同 Key 的消息会分布在同一分区中。普通消息在宕机时可能会乱序,分区顺序消息在宕机时依然保持顺序。

迁移步骤

  1. 执行以下命令,开启迁移进程:

    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

    参数说明请参见文档

  2. 配置 consumer.properties 文件,示例如下:

    ## 自建集群接入点
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    ## 消费者分区分配策略
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    #consumer group id
    group.id=test-consumer-group
    						

    详情请参见文档

  3. producer.properties 文件的配置因实例接入方式而有所不同,说明如下:

    • VPC 接入

      ## 集群接入点
      ## Kafka接入点,通过控制台获取
      ## 您在控制台获取的默认接入点
      bootstrap.servers=控制台上的接入点IP+Port
      
      ## 数据压缩方式
      compression.type=none
      								
    • 公网接入

      producer.properties 文件

          ## 集群接入点
          ## Kafka接入点,通过控制台获取
          ## 您在控制台获取的默认接入点
          bootstrap.servers=控制台上的接入点IP+Port
      
          ## 数据压缩方式
          compression.type=none
      
          ## truststore,使用本demo给的文件
          ssl.truststore.location=kafka.client.truststore.jks
          ssl.truststore.password=KafkaOnsClient
          security.protocol=SASL_SSL
          sasl.mechanism=PLAIN
      
          ## kafka 2.X版本在配置sasl接入时需要做以下配置。2.X以下版本不需要配置
          ssl.endpoint.identification.algorithm=
      								

      公网接入,在启动 kafka-mirror-maker.sh 前,需要设置java.security.auth.login.config

      > export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
      								

      kakfa_client_jaas.conf 文件

      KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      ## 控制台上提供的用户名和密码
      username="XXXXXXX"
      password="XXXXXXX";
      };
      								

结果验证

您可通过以下任一方法验证 Kafka MirrorMaker 运行是否成功。

  • 通过 kafka-consumer-groups.sh 查看自建集群消费进度:

    bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server 自建集群接入点 --group test-consumer-group

  • 往自建集群中发送消息,在消息队列 for Apache Kafka 控制台中查看 Topic 的分区状态,确认当前服务器上消息总量是否正确。可通过控制台查询消息来查看具体消息内容。