本文为您介绍如何使用开源工具,独立完成将自建Kafka集群的数据迁移至云消息队列 Kafka 版实例。
迁移流程
注意事项
如果是将阿里云上自建的Kafka集群进行迁移,建议在与自建Kafka集群相同地域购买云消息队列 Kafka 版实例,并在部署时选择相同的VPC,采用VPC实例进行内网迁移。
本文以自建的Kafka集群通过公网迁移至云消息队列 Kafka 版公网/VPC实例为例进行说明。
步骤一:规格评估
云消息队列 Kafka 版提供的规格评估功能,根据自建Kafka集群的信息,如集群流量情况、磁盘容量和类型等,评估并推荐需要的云消息队列 Kafka 版实例规格。具体操作请参见评估规格。
步骤二:购买实例
根据评估得出的实例规格,购买云消息队列 Kafka 版实例并进行部署。具体操作公网和VPC接入。
步骤三:迁移Topic和Group
实施迁移
登录自建Kafka集群服务器,下载并安装JDK8或11。
根据以下方式分别迁移Topic和Group。
迁移Topic
在迁移工具所在目录,执行以下命令,对要迁移的Topic进行预检。
java -jar kafka-migration-assessment.jar TopicMigrationFromZk \ --sourceZkConnect 192.168.XX.XX \ --destAk <yourdestAccessKeyId> \ --destSk <yourdestAccessKeySecret> \ --destRegionId <yourdestRegionId> \ --destInstanceId <yourdestInstanceId>
参数
描述
sourceZkConnect
自建的源ZooKeeper集群的IP地址
destAk
目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey ID
destSk
目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey Secret
destRegionId
目标云消息队列 Kafka 版实例的地域ID
destInstanceId
目标云消息队列 Kafka 版实例的ID
待确认的返回结果示例如下:
13:40:08 INFO - Begin to migrate topics:[test] 13:40:08 INFO - Total topic number:1 13:40:08 INFO - Will create topic:test, isCompactTopic:false, partition number:1
执行以下命令,迁移Topic。
java -jar kafka-migration-assessment.jar TopicMigrationFromZk \ --sourceZkConnect 192.168.XX.XX \ --destAk <yourdestAccessKeyId> \ --destSk <yourdestAccessKeySecret> \ --destRegionId <yourdestRegionId> \ --destInstanceId <yourdestInstanceId> \ --commit
参数
描述
commit
提交迁移
提交迁移的返回结果示例如下:
13:51:12 INFO - Begin to migrate topics:[test] 13:51:12 INFO - Total topic number:1 13:51:13 INFO - cmd=TopicMigrationFromZk, request=null, response={"code":200,"requestId":"7F76C7D7-AAB5-4E29-B49B-CD6F1E0F508B","success":true,"message":"operation success"} 13:51:13 INFO - TopicCreate success, topic=test, partition number=1, isCompactTopic=false
迁移Group
创建配置文件kafka.properties。
kafka.properties用于构造Kafka Consumer,从自建Kafka集群获取消费者位点信息。配置文件内容如下:
## 接入点。 bootstrap.servers=localhost:9092 ## Group ID。注意该Group不能有消费者位点信息,以保证能从第一个消息开始消费。 group.id=XXX ## 如果无安全配置,可以不配置以下内容。 ## SASL鉴权方式。 #sasl.mechanism=PLAIN ## 接入协议。 #security.protocol=SASL_SSL ## SSL根证书的路径。 #ssl.truststore.location=/Users/***/Documents/code/aliware-kafka-demos/main/resources/kafka.client.truststore.jks ## SSL密码。 #ssl.truststore.password=*** ## SASL路径。 #java.security.auth.login.config=/Users/***/kafka-java-demo/vpc-ssl/src/main/resources/kafka_client_jaas.conf
在迁移工具所在目录,执行以下命令,对要迁移的Group进行预检。
java -jar kafka-migration-assessment.jar ConsumerGroupMigrationFromTopic \ --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties \ --destAk <yourAccessKeyId> \ --destSk <yourAccessKeySecret> \ --destRegionId <yourRegionId> \ --destInstanceId <yourInstanceId>
参数
描述
propertiesPath
配置文件kafka.properties的文件路径
destAk
目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey ID
destSk
目标云消息队列 Kafka 版实例所属阿里云账号的AccessKey Secret
destRegionId
目标云消息队列 Kafka 版实例的地域ID
destInstanceId
目标云消息队列 Kafka 版实例的ID
待确认的返回结果示例如下:
15:29:45 INFO - Will create consumer groups:[XXX, test-consumer-group]
执行以下命令,迁移Group。
java -jar kafka-migration-assessment.jar ConsumerGroupMigrationFromTopic \ --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties \ --destAk <yourAccessKeyId> \ --destSk <yourAccessKeySecret> \ --destRegionId <yourRegionId> \ --destInstanceId <yourInstanceId> \ --commit
参数
描述
commit
提交迁移
提交迁移的返回结果示例如下:
15:35:51 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"C9797848-FD4C-411F-966D-0D4AB5D12F55","success":true,"message":"operation success"} 15:35:51 INFO - ConsumerCreate success, consumer group=XXX 15:35:57 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"3BCFDBF2-3CD9-4D48-92C3-385C8DBB9709","success":true,"message":"operation success"} 15:35:57 INFO - ConsumerCreate success, consumer group=test-consumer-group
查看迁移进度
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,单击迁移,然后单击元数据导入。
在元数据导入页签,即可看到迁移目标为云消息队列 Kafka 版实例ID的任务,以及Topic 迁移进度和Group 迁移进度。
验证结果
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,单击实例列表。
在实例列表页面,单击目标实例名称。
查看资源列表。
在左侧导航栏,单击Topic 管理,在Topic 管理页面的Topic列表中查看已创建的Topic。
在左侧导航栏,单击Group 管理,在Group 管理页面的Group列表中查看已创建的Group。
步骤四:(可选)迁移数据
Kafka的镜像特性可实现Kafka集群的数据备份。实现这一特性的工具就是MirrorMaker。您可以使用MirrorMaker将源集群中的数据镜像拷贝到目标集群。如下图所示,Mirror Maker使用一个内置的Consumer从源自建Kafka集群消费消息,然后再使用一个内置的Producer将这些消息重新发送到目标云消息队列 Kafka 版集群。更多信息,请参见Apache Kafka MirrorMaker。
前提条件
已完成Topic迁移
注意事项
Topic名称必须一致。
分区数量可以不一致。
在同一个分区中的数据迁移后并不保证依旧在同一个分区中。
默认情况下,Key相同的消息会分布在同一分区中。
普通消息在宕机时可能会乱序,分区顺序消息在宕机时依然保持顺序。
如果自建Kafka集群与云消息队列 Kafka 版实例都需要密码,且密码不一致,则不支持迁移。
实施迁移
根据实际环境选择VPC接入或公网接入。
公网接入
下载SSL证书mix.4096.client.truststore.jks。
配置
kafka_client_jaas.conf
。KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password"; };
配置
producer.properties
。## 消息队列Kafka版集群的SSL接入点(可在消息队列Kafka版控制台获取) bootstrap.servers=XXX.XXX.XXX.XXX:9093 ## 数据压缩方式 compression.type=none ## truststore(使用步骤1下载的文件) ssl.truststore.location=kafka.client.truststore.jks ssl.truststore.password=KafkaOnsClient security.protocol=SASL_SSL sasl.mechanism=PLAIN ## 消息队列Kafka版2.X版本在配置SASL接入时需要做以下配置,2.X以下版本不需要配置。 ssl.endpoint.identification.algorithm=
设置
java.security.auth.login.config
。export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
执行以下命令开启迁移进程。
sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName
VPC接入
配置
consumer.properties
。## 自建Kafka集群的接入点 bootstrap.servers=XXX.XXX.XXX.XXX:9092 ## 消费者分区分配策略 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor ## Group的名称 group.id=test-consumer-group
配置
producer.properties
。## 消息队列Kafka版集群的默认接入点(可在消息队列Kafka版控制台获取) bootstrap.servers=XXX.XXX.XXX.XXX:9092 ## 数据压缩方式 compression.type=none
执行以下命令开启迁移进程。
sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName
验证结果
您可通过以下任一方法验证MirrorMaker是否运行成功。
通过
kafka-consumer-groups.sh
查看自建集群消费进度。bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server自建集群接入点 --group test-consumer-group
往自建集群中发送消息,在云消息队列 Kafka 版控制台中查看Topic的分区状态,确认当前服务器上消息总量是否正确。您还可以通过云消息队列 Kafka 版控制台来查看具体消息内容。具体操作,请参见消息查询。
后续操作
为云消息队列 Kafka 版实例开启新的Group,准备消费实例的消息。
为云消息队列 Kafka 版实例开启新的Producer,下线旧的Producer,并使旧的Group继续消费自建Kafka集群的消息。
待自建Kafka集群的消息全部被旧的Group消费后,下线旧的Group和自建Kafka集群。