DataHub支持使用Kafka Mirror Maker工具将Kafka数据迁移至DataHub。
前提
已经创建好Project和Topic,创建方式详情见创建Topic示例
目前仅支持Kafka迁移数据至DataHub,不支持DataHub迁移数据至Kafka
DataHub目前不支持事务、幂等,需在DataHub目标端配置中禁用幂等配置
操作步骤
上传kafka_mirror_datahub.tgz至源端Kafka服务器并解压
tar -zxvf kafka_mirror_datahub.tgz
进入config目录,修改源端/目标端配置,配置文件说明如下:
consumer.properties 源端kafka配置
# 源端kafka server配置 bootstrap.servers=xx:9092 # consumer group id group.id=test-consumer-group auto.offset.reset=earliest session.timeout.ms=60000 heartbeat.interval.ms=40000 ssl.endpoint.identification.algorithm= key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
b. producer.properties 目标端DataHub配置
bootstrap.servers=dh-cn-zhangjiakou-pre.aliyuncs.com:9092 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"accessid\"\npassword=\"accesskey\"; security.protocol=SASL_SSL sasl.mechanism=PLAIN key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer compression.type=lz4 # 禁用幂等性配置 enable.idempotence=false
参数说明
topic-map.properties topic 映射配置
左边为源端kafka topic名称, 右边为要写入的DataHub topic,多组映射关系以行分割,Kafka的Topic映射之后为DataHub的project+topic, project和topic以 “.”分割 即project.topic
topicname=testproject.testtopic topicname1=testproject1.testtopic1
配置log4j.properties 进行日志输出
新建log4j.properties文件
配置模板
log4j.rootLogger=INFO, stdout, file log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.DatePattern='.'yyyy-MM-dd log4j.appender.file.File=/opt/logs/mm1.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n log4j.logger.kafka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.kafka.tools.MirrorMaker=INFO log4j.logger.org.apache.zookeeper=WARN
执行迁移脚本
在kafka根目录下执行以下脚本,观察是否有日志。
参数说明
--consumer.config :源端kafka配置文件。
--producer.config: 目标端DataHub配置文件。
--whitelist: 源端topic名称,多个topic以|隔开 示例 "a|b|c"。
--topic.mapping.file: topic 映射配置文件。
KAFKA_LOG4J_OPTS: 日志输出配置文件路径
nohup KAFKA_LOG4J_OPTS="log4j.properties" bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist "mirrortest" --topic.mapping.file /opt/kafka_2.12-3.7.2/config/topic-map.properties ... > /dev/null 2>&1 &
查看日志,观察是否有报错。启动成功示例如下:
登录DataHub管控台,查看DataHub端数据是否写入