本文通过示例为您介绍如何使用MirrorMaker 2(简称MM2)on Kafka Connect跨集群同步数据。
背景信息
使用场景
Kafka MM2适用于下列场景:
- 远程数据同步:通过MM2,Kafka数据可以在不同地域的集群进行传输复制。
- 灾备场景:通过MM2,可以构建不同数据中心的主备两个集群容灾架构,MM2实时同步两个集群的数据。当其中一个集群不可用时,可以将上面的应用程序切换到另一个集群,从而实现异地容灾功能。
- 数据迁移场景:在业务上云、混合云、集群升级等场景,存在数据从旧集群迁移到新集群的需求。此时,您可以使用MM2实现新旧数据的迁移,保证业务的连续性。
- 聚合数据中心场景:通过MM2,可以将多个Kafka子集群的数据同步到一个中心Kafka集群,实现数据的汇聚。
功能
Kafka MM2作为数据复制工具,具有以下功能:
- 复制topics数据以及配置信息。
- 复制consumer groups及其消费topic的offset信息。
- 复制ACLs。
- 自动检测新的topic以及partition。
- 提供MM2的metrics。
- 高可用以及可水平扩展的框架。
任务执行方式
MM2任务有以下执行方式:
- Distributed Connect集群的connector方式(推荐):在已有Connect集群执行MM2 connector任务的方式。您可以参照本文使用Connect集群服务的功能来管理MM2任务。
- Dedicated MirrorMaker集群方式:不需要使用Connect集群执行MM2 connector任务,而是直接通过Driver程序管理MM2的所有任务。具体操作,请参见使用MirrorMaker 2(Dedicated)跨集群同步数据。
- Standalone Connect的worker方式:执行单个MirrorSourceConnector任务,适合在测试场景下使用。
说明 推荐在Distributed Connect集群上启动MM2 connector任务,可以借助Connect集群的Rest服务管理MM2任务。
MM2的详细信息,请参见Apache Kafka。
前提条件
已创建两个Kafka集群,一个为源集群emrsource,一个为目标集群emrdest,并选择了Kafka服务,创建DataFlow集群的具体操作,请参见创建集群。说明 本文示例的源集群和目标集群都以EMR-3.42.0版本,且在同一VPC下的DataFlow集群为例。
使用限制
目标集群的Kafka软件版本为2.12_2.4.1及以上。
操作流程
步骤一:在目标集群创建Kafka Connect集群
- 新增EMR Task机器组。在EMR控制台目标集群emrdest的节点管理页面,创建Task机器组。
- 扩容Task机器组。
- 查看KafkaConnect服务状态,确保Kafka Connect集群已经启动。
- 使用SSH方式登录目标集群emrdest,详情请参见登录集群。
- 执行以下命令,检查Kafka Connect Rest服务状态。
curl -X GET http://task-1-1:8083| jq .
返回以下类似信息。% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
步骤二:使用MirrorMaker2 connector
- 准备MM2 connector配置文件。您需要准备以下文件:
- 准备MirrorSourceConnector配置文件本文示例MirrorSourceConnector配置文件命名为mm2-source-connector.json。按照如下示例并根据实际情况修改相应的参数值。更多配置项详情,请参见KIP-382的相关章节。
{ "name": "mm2-source-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "topics": "^foo.*", "tasks.max": "4", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "offset-syncs.topic.replication.factor": "3", "sync.topic.acls.interval.seconds": "20", "sync.topic.configs.interval.seconds": "20", "refresh.topics.interval.seconds": "20", "refresh.groups.interval.seconds": "20", "consumer.group.id": "mm2-mirror-source-consumer-group", "producer.enable.idempotence":"true", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
说明 本文示例代码中参数:source.cluster.bootstrap.servers
:该参数值的IP地址,需要替换为您实际环境源集群emrsource中Kafka服务的访问地址,并且需要确保源Kafka集群和Kafka Connect集群的联通性。topics
:该参数值表示会复制您源集群中以foo开头的Topic。
- 准备MirrorCheckpointConnector配置文件本文示例MirrorCheckpointConnector配置文件命名为mm2-checkpoint-connector.json。按照如下示例并根据实际情况修改相应的参数值。更多配置项详情,请参见KIP-382的相关章节。
{ "name": "mm2-checkpoint-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "checkpoints.topic.replication.factor": "3", "emit.checkpoints.interval.seconds": "20", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
- 准备MirrorHeartbeatConnector配置文件本文示例MirrorHeartbeatConnector配置文件命名为mm2-heartbeat-connector.json。按照如下示例并根据实际情况修改相应的参数值。更多配置项详情,请参见KIP-382的相关章节。
{ "name": "mm2-heartbeat-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "heartbeats.topic.replication.factor": "3", "emit.heartbeats.interval.seconds": "20", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
- 准备MirrorSourceConnector配置文件
- 使用MirrorSourceConnector。
- 使用MirrorCheckpointConnector。
- 使用MirrorHeartbeatConnector。
- 在目标集群执行以下命令,查看MM2相关topic。
kafka-topics.sh --list --bootstrap-server core-1-1:9092
此时,在目标集群中,您可以看到以下topic已经创建:- emrsource.foo开头的topic:由MirrorSourceConnector创建。
foo开头的topic是您源集群上已有的,需要复制的topic。
- emrsource.checkpoints.internal:由MirrorCheckpointConnector创建,用于存储offset等信息。
- heartbeats:由MirrorHeartbeatConnector创建。
- emrsource.foo开头的topic:由MirrorSourceConnector创建。