Kafka数据迁移Datahub方案

DataHub支持使用Kafka Mirror Maker工具将Kafka数据迁移至DataHub。

前提

已经创建好ProjectTopic,创建方式详情见创建Topic示例

说明
  • 目前仅支持Kafka迁移数据至DataHub,不支持DataHub迁移数据至Kafka

  • DataHub目前不支持事务、幂等,需在DataHub目标端配置中禁用幂等配置

操作步骤

  1. 上传kafka_mirror_datahub.tgz至源端Kafka服务器并解压

    tar -zxvf kafka_mirror_datahub.tgz
  1. 进入config目录,修改源端/目标端配置,配置文件说明如下:

    1. consumer.properties 源端kafka配置

    # 源端kafka server配置
    bootstrap.servers=xx:9092
    # consumer group id
    group.id=test-consumer-group
    auto.offset.reset=earliest
    session.timeout.ms=60000
    heartbeat.interval.ms=40000
    ssl.endpoint.identification.algorithm=
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

    b. producer.properties 目标端DataHub配置

    bootstrap.servers=dh-cn-zhangjiakou-pre.aliyuncs.com:9092
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"accessid\"\npassword=\"accesskey\";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    compression.type=lz4
    # 禁用幂等性配置
    enable.idempotence=false

    参数说明

    • 目标端bootstrap.servers 域名列表 参照兼容Kafka

    • sasl.jaas.config 用于指定 Kafka 客户端或 Broker SASL 登录所需的登录模块、用户名、密码等敏感信 accessid、accesskey替换为用户使用的ak信息。

    • 更多配置项信息请参照兼容Kafka

  1. topic-map.properties topic 映射配置

    左边为源端kafka topic名称, 右边为要写入的DataHub topic,多组映射关系以行分割,KafkaTopic映射之后为DataHubproject+topic, projecttopic以 “.”分割 即project.topic

    topicname=testproject.testtopic
    topicname1=testproject1.testtopic1
  1. 配置log4j.properties 进行日志输出

    1. 新建log4j.properties文件

    2. 配置模板

      1. log4j.rootLogger=INFO, stdout, file
          
        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
        log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n
        
        log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
        log4j.appender.file.DatePattern='.'yyyy-MM-dd
        log4j.appender.file.File=/opt/logs/mm1.log
        log4j.appender.file.layout=org.apache.log4j.PatternLayout
        log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n
        
        log4j.logger.kafka=INFO
        log4j.logger.org.apache.kafka=INFO
        log4j.logger.kafka.tools.MirrorMaker=INFO
        log4j.logger.org.apache.zookeeper=WARN
  2. 执行迁移脚本

    kafka根目录下执行以下脚本,观察是否有日志。

    参数说明

    • --consumer.config :源端kafka配置文件。

    • --producer.config: 目标端DataHub配置文件。

    • --whitelist: 源端topic名称,多个topic以|隔开 示例 "a|b|c"。

    • --topic.mapping.file: topic 映射配置文件。

    • KAFKA_LOG4J_OPTS: 日志输出配置文件路径

    nohup KAFKA_LOG4J_OPTS="log4j.properties"  bin/kafka-mirror-maker.sh   --consumer.config config/consumer.properties   --producer.config config/producer.properties   --whitelist "mirrortest"   --topic.mapping.file /opt/kafka_2.12-3.7.2/config/topic-map.properties ... > /dev/null 2>&1 &
  3. 查看日志,观察是否有报错。启动成功示例如下:

    1. image

  4. 登录DataHub管控台,查看DataHub端数据是否写入

    image.png