本文介绍如何使用MirrorMaker将自建Kafka集群的数据迁移到云消息队列 Kafka 版集群。
前提条件
您已完成以下操作:背景信息
Kafka的镜像特性可实现Kafka集群的数据备份。实现这一特性的工具就是MirrorMaker。您可以使用MirrorMaker将源集群中的数据镜像拷贝到目标集群。如下图所示,Mirror Maker使用一个内置的Consumer从源自建Kafka集群消费消息,然后再使用一个内置的Producer将这些消息重新发送到目标云消息队列 Kafka 版集群。
更多信息,请参见Apache Kafka MirrorMaker。
注意事项
- Topic名称必须一致。
- 分区数量可以不一致。
- 在同一个分区中的数据迁移后并不保证依旧在同一个分区中。
- 默认情况下,Key相同的消息会分布在同一分区中。
- 普通消息在宕机时可能会乱序,分区顺序消息在宕机时依然保持顺序。
VPC接入
- 配置consumer.properties。
## 自建Kafka集群的接入点 bootstrap.servers=XXX.XXX.XXX.XXX:9092 ## 消费者分区分配策略 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor ## Group的名称 group.id=test-consumer-group
- 配置producer.properties。
## 云消息队列 Kafka 版集群的默认接入点(可在云消息队列 Kafka 版控制台获取) bootstrap.servers=XXX.XXX.XXX.XXX:9092 ## 数据压缩方式 compression.type=none
- 执行以下命令开启迁移进程。
sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName
公网接入
- 下载kafka.client.truststore.jks。
- 配置kafka_client_jaas.conf。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password"; };
- 配置consumer.properties。
## 自建Kafka集群的接入点 bootstrap.servers=XXX.XXX.XXX.XXX:9092 ## 消费者分区分配策略 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor ## Group名称 group.id=test-consumer-group
- 配置producer.properties。
## 云消息队列 Kafka 版集群的SSL接入点(可在云消息队列 Kafka 版控制台获取) bootstrap.servers=XXX.XXX.XXX.XXX:9093 ## 数据压缩方式 compression.type=none ## truststore(使用步骤1下载的文件) ssl.truststore.location=kafka.client.truststore.jks ssl.truststore.password=KafkaOnsClient security.protocol=SASL_SSL sasl.mechanism=PLAIN ## 云消息队列 Kafka 版2.X版本在配置SASL接入时需要做以下配置,2.X以下版本不需要配置。 ssl.endpoint.identification.algorithm=
- 设置java.security.auth.login.config。
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
- 执行以下命令开启迁移进程。
sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName
结果验证
您可通过以下任一方法验证MirrorMaker是否运行成功。
- 通过
kafka-consumer-groups.sh
查看自建集群消费进度。bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server自建集群接入点 --group test-consumer-group
- 往自建集群中发送消息,在云消息队列 Kafka 版控制台中查看Topic的分区状态,确认当前服务器上消息总量是否正确。您还可以通过云消息队列 Kafka 版控制台来查看具体消息内容。具体操作,请参见查询消息。