本教程介绍如何使用元数据迁移工具将自建Kafka集群的Consumer Group元数据迁移到消息队列Kafka版实例。

前提条件

您已完成以下操作:
注意
  • 迁移不会删除源Kafka的Consumer Group,只是在目标消息队列Kafka版实例创建相同配置的Consumer Group。
  • 迁移内容仅为Consumer Group配置,不包含消费的Topic及位点信息。

操作步骤

  1. 打开本地终端。
  2. 切换到迁移工具所在目录。
  3. 创建配置文件kafka.properties
    kafka.properties用于构造Kafka Consumer,从自建Kafka集群获取消费者位点信息。配置文件内容如下。
    ## 接入点
    bootstrap.servers=localhost:9092
    
    ## Consumer Group,注意该Consumer Group不能有消费者位点信息,以保证能从第一个消息开始消费。
    group.id=XXX
    
    ## 如果无安全配置,可以不配置以下内容
    
    ## SASL鉴权方式
    #sasl.mechanism=PLAIN
    
    ## 接入协议
    #security.protocol=SASL_SSL
    
    ## SSL根证书的路径
    #ssl.truststore.location=/Users/***/Documents/code/aliware-kafka-demos/main/resources/kafka.client.truststore.jks
    
    ## SSL密码
    #ssl.truststore.password=***
    
    ## SASL路径
    #java.security.auth.login.config=/Users/***/kafka-java-demo/vpc-ssl/src/main/resources/kafka_client_jaas.conf
  4. 确认要迁移的Consumer Group元数据。
    java -jar kafka-migration.jar ConsumerGroupMigrationFromTopic --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties --destAk LTAI4FwQ5aK1mFYCspJ1*** --destSk wvDxjjRQ1tHPiL0oj7Y2Z7WDNkS*** --destRegionId cn-hangzhou --destInstanceId alikafka_pre-cn-v0h1cng00***
    参数 描述
    propertiesPath 配置文件kafka.properties的文件路径
    destAk 目标消息队列Kafka版实例的AK
    destSk 目标消息队列Kafka版实例的SK
    destRegionId 目标消息队列Kafka版实例的地域ID
    destInstanceId 目标消息队列Kafka版实例ID
    commit 提交迁移
    返回结果如下。
    15:29:45 INFO - Will create consumer groups:[XXX, test-consumer-group]
  5. 提交要迁移的Consumer Group元数据。
    java -jar kafka-migration.jar ConsumerGroupMigrationFromTopic --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties --destAk LTAI4FwQ5aK1mFYCspJ1h*** --destSk wvDxjjRQ1tHPiL0oj7Y2Z7WDNkS*** --destRegionId cn-hangzhou --destInstanceId alikafka_pre-cn-v0h1cng00*** --commit
    参数 描述
    commit 提交迁移
    返回结果如下。
    15:35:51 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"C9797848-FD4C-411F-966D-0D4AB5D12F55","success":true,"message":"operation success"}
    15:35:51 INFO - ConsumerCreate success, consumer group=XXX
    15:35:57 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"3BCFDBF2-3CD9-4D48-92C3-385C8DBB9709","success":true,"message":"operation success"}
    15:35:57 INFO - ConsumerCreate success, consumer group=test-consumer-group
  6. 确认Consumer Group元数据迁移是否成功。
    1. 登录消息队列Kafka版控制台,在顶部菜单栏选择目标地域。
    2. 在左侧导航栏,单击Consumer Group管理
    3. Consumer Group管理页面,选择目标实例。
      Consumer Group名称列表,显示成功创建的Consumer Group。迁移开源consumergroup元数据到消息队列Kafka版