本文通过示例为您介绍如何通过EMR的集群脚本功能,快速部署使用MirrorMaker 2.0(MM2)服务同步数据。

背景信息

本文的业务场景以EMR DataFlow集群作为目的集群,并且在目的集群中以Dedicated MirrorMaker集群的方式部署MM2,即EMR DataFlow集群既作为目的集群又作为Dedicated MirrorMaker集群。在实际业务场景中,您可以将MirrorMaker集群部署到单独的服务器上。

Kafka MM2适用于下列场景:
  • 远程数据同步:通过MM2,Kafka数据可以在不同地域的集群进行传输复制。
  • 灾备场景:通过MM2,可以构建不同数据中心的主备两个集群容灾架构,MM2实时同步两个集群的数据。当其中一个集群不可用时,可以将上面的应用程序切换到另一个集群,从而实现异地容灾功能。
  • 数据迁移场景:在业务上云、混合云、集群升级等场景,存在数据从旧集群迁移到新集群的需求。此时,您可以使用MM2实现新旧数据的迁移,保证业务的连续性。
  • 聚合数据中心场景:通过MM2,可以将多个Kafka子集群的数据同步到一个中心Kafka集群,实现数据的汇聚。
Kafka MM2作为数据复制工具,具有以下功能:
  • 复制topics数据以及配置信息。
  • 复制consumer groups及其消费topicoffset信息。
  • 复制ACLs。
  • 自动检测新的topic以及partition。
  • 提供MM2metrics。
  • 高可用以及可水平扩展的框架。
MM2任务有以下执行方式:
  • Distributed Connect集群的connector方式(推荐):在已有Connect集群执行MM2 connector任务的方式。具体操作,请参见使用MirrorMaker 2(on Connect)跨集群同步数据
  • Dedicated MirrorMaker集群方式:不需要使用Connect集群执行MM2 connector任务,而是直接通过Driver程序管理MM2的所有任务。

    您可以参照本文通过Driver程序来管理MM2任务。

  • Standalone Connectworker方式:执行单个MirrorSourceConnector任务,适合在测试场景下使用。
说明 推荐在Distributed Connect集群上启动MM2 connector任务,可以借助Connect集群的Rest服务管理MM2任务。

前提条件

  • 已创建两个Kafka集群,一个为源集群,一个为目的集群(EMR DataFlow集群),并选择了Kafka服务,创建DataFlow集群详情请参见创建集群
    说明 本文示例的源和目的集群都以EMR-3.42.0版本的DataFlow集群为例。
  • 已在OSS上创建存储空间,详情请参见创建存储空间

使用限制

EMR DataFlow集群的Kafka软件的版本为2.12_2.4.1及以上。

操作步骤

  1. 准备MM2配置文件mm2.properties并上传到您的OSS存储。
    以下配置内容仅作为参考,您需要替换文本中的源集群和目标集群的src.bootstrap.serversdest.bootstrap.servers,并根据实际业务需求进行相应的配置。MM2配置的详细信息请参见Configuring Geo-Replication
    # see org.apache.kafka.clients.consumer.ConsumerConfig for more details
    
    # Sample MirrorMaker 2.0 top-level configuration file
    # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
    
    # specify any number of cluster aliases
    clusters = src, dest
    
    # connection information for each cluster
    src.bootstrap.servers = <your source kafka cluster servers>
    dest.bootstrap.servers = <your destination kafka cluster servers>
    
    # enable and configure individual replication flows
    src->dest.enabled = true
    src->dest.topics = foo-.*
    groups=.*
    topics.blacklist="__.*"
    
    # customize as needed
    replication.factor=3
  2. 准备部署脚本kafka_mm2_deploy.sh并上传到OSS存储。
    #!/bin/bash
    SIGNAL=${SIGNAL:-TERM}
    PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}')
    if [ -n "$PIDS" ]; then
      echo "stop the exist mirror maker server."
      kill -s $SIGNAL $PIDS
    fi
    KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf
    TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1
    cd $KAFKA_CONF
    if [ -e "./mm2.properties" ]; then
      mv mm2.properties mm2.properties.bak
    fi
    ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret>
    su - kafka <<EOF
    exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties
    exit;
    EOF
    涉及替换参数如下。
    参数 描述
    KAFKA_CONF 检查变量路径是否正确,如果不正确,则需要修改为实际的地址。
    TAIHAO_EXECUTOR
    oss://<yourBucket>/mm2.properties 替换为mm2.properties的实际存储路径。
    <yourEndpoint> OSS服务的地址。
    <yourAccessKeyId> 阿里云账号的AccessKey ID。
    <yourAccessKeySecret> 阿里云账号的AccessKey Secret。
  3. EMR控制台执行脚本, 具体操作请参见手动执行脚本
    说明 在创建执行脚本的过程中,您应正确选择脚本的执行节点,通常选择所有的Broker节点。
    执行完成后,即实现了Kafka集群间的数据迁移。