借助工具迁移

本文为您介绍如何使用开源工具,独立完成将自建Kafka集群的数据迁移至云消息队列 Kafka 版实例。

迁移流程

image

注意事项

  • 如果是将阿里云上自建的Kafka集群进行迁移,建议在与自建Kafka集群相同地域购买云消息队列 Kafka 版实例,并在部署时选择相同的VPC,采用VPC实例进行内网迁移。

  • 本文以自建的Kafka集群通过公网迁移至云消息队列 Kafka 版公网/VPC实例为例进行说明。

步骤一:规格评估

云消息队列 Kafka 版提供的规格评估功能,根据自建Kafka集群的信息,如集群流量情况、磁盘容量和类型等,评估并推荐需要的云消息队列 Kafka 版实例规格。具体操作请参见评估规格

步骤二:购买实例

根据评估得出的实例规格,购买云消息队列 Kafka 版实例并进行部署。具体操作公网和VPC接入

步骤三:迁移TopicGroup

实施迁移

  1. 登录自建Kafka集群服务器,下载并安装JDK811。

  2. 下载迁移工具kafka-migration-assessment.jar

  3. 根据以下方式分别迁移TopicGroup。

    迁移Topic

    1. 在迁移工具所在目录,执行以下命令,对要迁移的Topic进行预检。

      java -jar kafka-migration-assessment.jar TopicMigrationFromZk  \
      --sourceZkConnect 192.168.XX.XX  \
      --destAk <yourdestAccessKeyId>  \
      --destSk <yourdestAccessKeySecret>  \
      --destRegionId <yourdestRegionId>  \
      --destInstanceId <yourdestInstanceId>

      参数

      描述

      sourceZkConnect

      自建的源ZooKeeper集群的IP地址

      destAk

      目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey ID

      destSk

      目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey Secret

      destRegionId

      目标云消息队列 Kafka 版实例的地域ID

      destInstanceId

      目标云消息队列 Kafka 版实例的ID

      待确认的返回结果示例如下:

      13:40:08 INFO - Begin to migrate topics:[test]
      13:40:08 INFO - Total topic number:1
      13:40:08 INFO - Will create topic:test, isCompactTopic:false, partition number:1
    2. 执行以下命令,迁移Topic。

      java -jar kafka-migration-assessment.jar TopicMigrationFromZk  \
      --sourceZkConnect 192.168.XX.XX  \
      --destAk <yourdestAccessKeyId>  \
      --destSk <yourdestAccessKeySecret>  \
      --destRegionId <yourdestRegionId>  \
      --destInstanceId <yourdestInstanceId>  \
      --commit

      参数

      描述

      commit

      提交迁移

      提交迁移的返回结果示例如下:

      13:51:12 INFO - Begin to migrate topics:[test]
      13:51:12 INFO - Total topic number:1
      13:51:13 INFO - cmd=TopicMigrationFromZk, request=null, response={"code":200,"requestId":"7F76C7D7-AAB5-4E29-B49B-CD6F1E0F508B","success":true,"message":"operation success"}
      13:51:13 INFO - TopicCreate success, topic=test, partition number=1, isCompactTopic=false

    迁移Group

    1. 创建配置文件kafka.properties

      kafka.properties用于构造Kafka Consumer,从自建Kafka集群获取消费者位点信息。配置文件内容如下:

      ## 接入点。
      bootstrap.servers=localhost:9092
      
      ## Group ID。注意该Group不能有消费者位点信息,以保证能从第一个消息开始消费。
      group.id=XXX
      
      ## 如果无安全配置,可以不配置以下内容。
      
      ## SASL鉴权方式。
      #sasl.mechanism=PLAIN
      
      ## 接入协议。
      #security.protocol=SASL_SSL
      
      ## SSL根证书的路径。
      #ssl.truststore.location=/Users/***/Documents/code/aliware-kafka-demos/main/resources/kafka.client.truststore.jks
      
      ## SSL密码。
      #ssl.truststore.password=***
      
      ## SASL路径。
      #java.security.auth.login.config=/Users/***/kafka-java-demo/vpc-ssl/src/main/resources/kafka_client_jaas.conf
    2. 在迁移工具所在目录,执行以下命令,对要迁移的Group进行预检。

      java -jar kafka-migration-assessment.jar ConsumerGroupMigrationFromTopic  \
      --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties  \
      --destAk <yourAccessKeyId>  \
      --destSk <yourAccessKeySecret>  \
      --destRegionId <yourRegionId>  \
      --destInstanceId <yourInstanceId>

      参数

      描述

      propertiesPath

      配置文件kafka.properties的文件路径

      destAk

      目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey ID

      destSk

      目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey Secret

      destRegionId

      目标云消息队列 Kafka 版实例的地域ID

      destInstanceId

      目标云消息队列 Kafka 版实例的ID

      待确认的返回结果示例如下:

      15:29:45 INFO - Will create consumer groups:[XXX, test-consumer-group]
    3. 执行以下命令,迁移Group

      java -jar kafka-migration-assessment.jar ConsumerGroupMigrationFromTopic  \
      --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties  \
      --destAk <yourAccessKeyId>  \
      --destSk <yourAccessKeySecret>  \
      --destRegionId <yourRegionId>  \
      --destInstanceId <yourInstanceId>  \
      --commit

      参数

      描述

      commit

      提交迁移

      提交迁移的返回结果示例如下:

      15:35:51 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"C9797848-FD4C-411F-966D-0D4AB5D12F55","success":true,"message":"operation success"}
      15:35:51 INFO - ConsumerCreate success, consumer group=XXX
      15:35:57 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"3BCFDBF2-3CD9-4D48-92C3-385C8DBB9709","success":true,"message":"operation success"}
      15:35:57 INFO - ConsumerCreate success, consumer group=test-consumer-group

查看迁移进度

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,单击迁移,然后单击元数据导入

  3. 元数据导入页签,即可看到迁移目标为云消息队列 Kafka 版实例ID的任务,以及Topic 迁移进度Group 迁移进度

验证结果

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,单击实例列表

  3. 实例列表页面,单击目标实例名称。

  4. 查看资源列表。

    • 在左侧导航栏,单击Topic 管理,在Topic 管理页面的Topic列表中查看已创建的Topic。

    • 在左侧导航栏,单击Group 管理,在Group 管理页面的Group列表中查看已创建的Group。

步骤四:(可选)迁移数据

Kafka的镜像特性可实现Kafka集群的数据备份。实现这一特性的工具就是MirrorMaker。您可以使用MirrorMaker将源集群中的数据镜像拷贝到目标集群。如下图所示,Mirror Maker使用一个内置的Consumer从源自建Kafka集群消费消息,然后再使用一个内置的Producer将这些消息重新发送到目标云消息队列 Kafka 版集群。更多信息,请参见Apache Kafka MirrorMaker

image

前提条件

注意事项

  • Topic名称必须一致。

  • 分区数量可以不一致。

  • 在同一个分区中的数据迁移后并不保证依旧在同一个分区中。

  • 默认情况下,Key相同的消息会分布在同一分区中。

  • 普通消息在宕机时可能会乱序,分区顺序消息在宕机时依然保持顺序。

  • 如果自建Kafka集群与云消息队列 Kafka 版实例都需要密码,且密码不一致,则不支持迁移。

实施迁移

根据实际环境选择VPC接入或公网接入。

公网接入

  1. 下载SSL证书mix.4096.client.truststore.jks

  2. 配置kafka_client_jaas.conf

    KafkaClient {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="your username"
       password="your password";
    };
  3. 配置producer.properties

    ## 消息队列Kafka版集群的SSL接入点(可在消息队列Kafka版控制台获取)
    bootstrap.servers=XXX.XXX.XXX.XXX:9093
    
    ## 数据压缩方式
    compression.type=none
    
    ## truststore(使用步骤1下载的文件)
    ssl.truststore.location=kafka.client.truststore.jks
    ssl.truststore.password=KafkaOnsClient
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    
    ## 消息队列Kafka2.X版本在配置SASL接入时需要做以下配置,2.X以下版本不需要配置。
    ssl.endpoint.identification.algorithm=
  4. 设置java.security.auth.login.config

    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"                              
  5. 执行以下命令开启迁移进程。

    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

VPC接入

  1. 配置consumer.properties

    ## 自建Kafka集群的接入点
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 消费者分区分配策略
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    
    ## Group的名称
    group.id=test-consumer-group
  2. 配置producer.properties

    ## 消息队列Kafka版集群的默认接入点(可在消息队列Kafka版控制台获取)
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 数据压缩方式
    compression.type=none                                
  3. 执行以下命令开启迁移进程。

    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

验证结果

您可通过以下任一方法验证MirrorMaker是否运行成功。

  • 通过kafka-consumer-groups.sh查看自建集群消费进度。

    bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server自建集群接入点 --group test-consumer-group

  • 往自建集群中发送消息,在云消息队列 Kafka 版控制台中查看Topic的分区状态,确认当前服务器上消息总量是否正确。您还可以通过云消息队列 Kafka 版控制台来查看具体消息内容。具体操作,请参见消息查询

后续操作

  1. 云消息队列 Kafka 版实例开启新的Group,准备消费实例的消息。

  2. 云消息队列 Kafka 版实例开启新的Producer,下线旧的Producer,并使旧的Group继续消费自建Kafka集群的消息。

  3. 待自建Kafka集群的消息全部被旧的Group消费后,下线旧的Group和自建Kafka集群。