DataHub支持使用Kafka Mirror Maker工具将Kafka数据迁移至DataHub。
前提
已经创建好Project和Topic,创建方式详情见创建Topic示例
-
目前仅支持Kafka迁移数据至DataHub,不支持DataHub迁移数据至Kafka
-
DataHub目前不支持事务、幂等,需在DataHub目标端配置中禁用幂等配置
操作步骤
-
上传kafka_mirror_datahub.tgz至源端Kafka服务器并解压
tar -zxvf kafka_mirror_datahub.tgz
-
进入config目录,修改源端/目标端配置,配置文件说明如下:
-
consumer.properties 源端kafka配置
# 源端kafka server配置 bootstrap.servers=xx:9092 # consumer group id group.id=test-consumer-group auto.offset.reset=earliest session.timeout.ms=60000 heartbeat.interval.ms=40000 ssl.endpoint.identification.algorithm= key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializerb. producer.properties 目标端DataHub配置
bootstrap.servers=dh-cn-zhangjiakou-pre.aliyuncs.com:9092 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"accessid\"\npassword=\"accesskey\"; security.protocol=SASL_SSL sasl.mechanism=PLAIN key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer compression.type=lz4 # 禁用幂等性配置 enable.idempotence=false参数说明
-
-
topic-map.properties topic 映射配置
左边为源端kafka topic名称, 右边为要写入的DataHub topic,多组映射关系以行分割,Kafka的Topic映射之后为DataHub的project+topic, project和topic以 “.”分割 即project.topic
topicname=testproject.testtopic topicname1=testproject1.testtopic1
-
配置log4j.properties 进行日志输出
-
新建log4j.properties文件
-
配置模板
log4j.rootLogger=INFO, stdout, file log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.DatePattern='.'yyyy-MM-dd log4j.appender.file.File=/opt/logs/mm1.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n log4j.logger.kafka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.kafka.tools.MirrorMaker=INFO log4j.logger.org.apache.zookeeper=WARN
-
-
执行迁移脚本
在kafka根目录下执行以下脚本,观察是否有日志。
参数说明
-
--consumer.config :源端kafka配置文件。
-
--producer.config: 目标端DataHub配置文件。
-
--whitelist: 源端topic名称,多个topic以|隔开 示例 "a|b|c"。
-
--topic.mapping.file: topic 映射配置文件。
-
KAFKA_LOG4J_OPTS: 日志输出配置文件路径
nohup KAFKA_LOG4J_OPTS="log4j.properties" bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist "mirrortest" --topic.mapping.file /opt/kafka_2.12-3.7.2/config/topic-map.properties ... > /dev/null 2>&1 & -
-
查看日志,观察是否有报错。启动成功示例如下:
-
[2025-08-06 17:27:41] [INFO] Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$:31) [2025-08-06 17:27:41] [INFO] Starting mirror maker (kafka.tools.MirrorMaker$:62) [2025-08-06 17:27:41] [INFO] Loaded topic mappings: mirrortest -> test_suyang.mirror (kafka.tools.MirrorMaker$:62) [2025-08-06 17:27:41] [INFO] ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [xxx] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = lz4 connections.max.idle.ms = 540000 delivery.timeout.ms = 2147483647 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 9223372036854775807 max.in.flight.requests.per.connection = 1 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000
-
-
登录DataHub管控台,查看DataHub端数据是否写入
在数据总线页面进入目标Project下的目标Topic(如
kafkatest / test),在Shard列表页签确认Shard状态为ACTIVE且存在最新数据时间。单击右侧抽样,在抽样面板中选择Shard ID并设置采样条数,单击抽样按钮,即可在数据预览表格中确认数据已成功写入DataHub。