快速开始使用EMR Kafka

本文为您介绍如何创建E-MapReduce(简称EMR)Kafka集群、使用Kafka TopicKafka Connect服务,帮您快速了解和上手使用EMR Kafka。

注意事项

创建EMR Kafka集群前,您需要根据业务的预估负载,选择合适的ECS实例机型以及Broker实例个数。由于业务场景差异很大,所以无法给出通用的集群规划,您需要根据您的实际环境创建集群。通常,建议您选择机型时考虑以下配置:

  • Broker机型的CPU和内存配比为1:4。

  • 选择云盘作为数据存储盘。

  • 充分考虑云盘的I/O吞吐率以及网卡带宽之间的关系。

在部署参数上,考虑以下因素:

  • 由于EMR Kafka版本仍依赖于Zookeeper,且Zookeeper的可用性直接关系到Kafka服务的高可用,因此,建议您创建集群时,选择高可用的部署方式。启用高可用后,将创建3个节点的Zookeeper服务。

  • 如果Master机器组只部署Zookeeper,则Master机器组只需要配置1块数据盘即可。

更详细的评估建议,请参见集群资源规格评估建议

创建EMR Kafka集群

该部分内容为您简单介绍如何创建Kafka集群。如需更详细的创建操作,请参见创建集群

  1. 进入创建集群页面。

    1. 登录E-MapReduce控制台

    2. 单击上方的创建集群

  2. 软件配置阶段,您可以根据需要的Kafka版本,选择对应的EMR版本。

    打开服务高可用开关,创建3节点的ZooKeeper集群。软件配置

    重要

    启用高可用后,将在Master机器组上部署3个节点的Zookeeper服务。由于EMR Kafka版本的服务可用性仍依赖于Zookeeper,所以建议您创建集群时,选择高可用的部署方式。

  3. 硬件配置阶段,选择合适的ECS实例机型以及节点数量。

    • 机型:Core节点组选择CPU和内存配比为1 Core:4 GB的机型。

    • 节点数量:Core节点组选择比Kafka分区副本数多1的节点数量以保持足够的冗余。例如,如果规划副本数为3,则节点数选择为4。

    硬件配置

  4. 请根据需求填写相关参数,以完成集群的创建。

使用Kafka Topic

该部分内容为您介绍如何使用Kafka Topic进行数据的生产消费。实际业务场景,您也可以使用Kafka ManagerCruise Control等软件管理集群。

  1. 使用SSH方式登录Kafka集群的Master节点,详情请参见登录集群

  2. 执行以下命令,创建Kafka Topic。

    sudo su - kafka
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
  3. 执行以下命令,查看Kafka topic详细信息。

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test --describe
  4. 执行以下命令,生产数据。

     kafka-console-producer.sh  --broker-list core-1-1:9092 --topic test

    在此命令执行后,您可以输入消息并按回车键将其发送到Topic。

  5. 请新开一个终端窗口,执行以下命令,以消费数据。

    kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --topic test --from-beginning --group test-consumer-group

使用Kafka Connect服务

EMR-3.41.0之后版本、EMR-5.7.0之后版本支持Kafka Connect组件的部署。该部分内容为您介绍如何使用Kafka Connect服务。

  1. 进入节点管理页面。

    1. 登录E-MapReduce控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击目标集群操作列的节点管理

  2. 创建Kafka Connect节点组。

    Connect安装在EMR Task节点组。在EMR Kafka集群创建Task节点组后,EMR会自动在该节点组创建Kafka Connect集群。

    1. 新建EMR Task节点组

      可以在节点管理页面,单击新增节点组,新建Task节点组。具体操作,请参见新增节点组

    2. 扩容Task节点组

      根据您的实际需求,可以扩容Task机器组实例数量。具体操作,请参见扩容集群

  3. 查看KafkaConnect服务状态,确保Kafka Connect集群已经启动。

    您可以在Kafka服务的状态页面的组件列表区域,查看KafkaConnect的组件状态,确保组件在运行中。KafkaConnect

  4. 检查Kafka Connect Rest服务状态。

    1. 使用SSH方式登录Kafka集群的Master节点,详情请参见登录集群

    2. 执行以下命令,检查Kafka Connect Rest服务状态。

      curl -X GET http://task-1-1:8083| jq .

      您会看到返回以下类似信息。

        % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                       Dload  Upload   Total   Spent    Left  Speed
      100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
      {
        "version": "2.4.1",
        "commit": "42ce056344c5625a",
        "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
      }
  5. 使用Kafka Connect迁移数据。

    您可以在Kafka集群中启动MirrorMaker任务,进行数据复制与迁移。具体操作,请参见使用MirrorMaker 2(on Connect)跨集群同步数据

相关文档