快速开始使用EMR Kafka

本文为您介绍如何创建E-MapReduce(简称EMR)Kafka集群、Kafka访问的设置,使用Kafka Topic和Kafka Connect服务,帮您快速了解和上手使用EMR Kafka。

注意事项

创建EMR Kafka集群前,您需要根据业务的预估负载,选择合适的ECS实例机型以及Broker实例个数。由于业务场景差异很大,所以无法给出通用的集群规划,您需要根据您的实际环境创建集群。通常,建议您选择机型时考虑以下配置:

  • Broker机型的CPU和内存配比为1:4。

  • 选择云盘作为数据存储盘。

  • 充分考虑云盘的I/O吞吐率以及网卡带宽之间的关系。

在部署参数上,考虑以下因素:

  • 由于EMR Kafka版本仍依赖于Zookeeper,且Zookeeper的可用性直接关系到Kafka服务的高可用,因此,建议您创建集群时,选择高可用的部署方式。启用高可用后,将创建3个节点的Zookeeper服务。

  • 如果Master机器组只部署Zookeeper,则Master机器组只需要配置1块数据盘即可。

更详细的评估建议,请参见集群资源规格评估建议

创建EMR Kafka集群

该部分内容为您简单介绍如何创建Kafka集群,更详细的创建操作,请参见创建集群

  1. 进入创建集群页面。

    1. 登录E-MapReduce控制台

    2. 单击上方的创建集群

  2. 软件配置阶段,您可以根据需要的Kafka版本,选择对应的EMR版本。

    打开服务高可用开关,创建3节点的ZooKeeper集群。软件配置

    重要

    启用高可用后,将在Master机器组上部署3个节点的Zookeeper服务。由于EMR Kafka版本的服务可用性仍依赖于Zookeeper,所以建议您创建集群时,选择高可用的部署方式。

  3. 硬件配置阶段,选择合适的ECS实例机型以及节点数量。

    • 机型:Core节点组选择CPU和内存配比为1 Core:4 GB的机型。

    • 节点数量:Core节点组选择比Kafka分区副本数多1的节点数量以保持足够的冗余。例如,如果规划副本数为3,则节点数选择为4。

    硬件配置

  4. 基础配置阶段,根据需求填写相关参数。

    基础配置

  5. 确认订单页面,选中E-MapReduce服务条款复选框,单击创建

(可选)Kafka访问设置

默认情况下,Kafka不启用登录鉴权等安全设置。您可以根据实际情况选择是否执行以下操作启用相关配置。

启用默认SSL加密功能

您可以执行以下步骤使用默认证书配置SSL。更多SSL的配置,请参见使用SSL加密Kafka链接

  1. 进入服务的配置页面。

    1. 登录E-MapReduce控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击目标集群操作列的集群服务

    4. 集群服务页面,单击Kafka服务区域的配置

  2. 修改配置项。

    1. 配置页面,单击server.properties页签。

    2. 修改kafka.ssl.config.type的参数值为DEFAULT

      修改参数

  3. 保存配置。

    1. 单击保存

    2. 在弹出的对话框中,输入执行原因,单击保存

  4. 重启Kafka服务。

    1. 在Kafka服务的配置页面,选择更多操作 > 重启

    2. 在弹出的对话框中,输入执行原因,单击确定

    3. 确认对话框中,单击确定

配置SASL登录认证功能

该部分内容为您介绍如何启用SASL/SCRAM-SHA-512认证机制。

  1. 创建Kafka服务管理用户。

    1. 使用SSH方式登录Kafka集群,详情请参见登录集群

    2. 执行以下命令,创建用户kafka。

      sudo su - kafka
      kafka-configs.sh --bootstrap-server core-1-1:9092 --alter --add-config 'SCRAM-SHA-256=[password=kafka-secret],SCRAM-SHA-512=[password=kafka-secret]' --entity-type users --entity-name kafka
  2. 修改配置项。

    1. 在EMR控制台的集群服务页面,单击Kafka服务区域的配置

    2. 配置页面,单击server.properties页签。

    3. 配置过滤中,输入配置项kafka.sasl.config.type,单击Search图标。

    4. 修改kafka.sasl.config.type的参数值为CUSTOM,单击保存

    5. 在弹出的对话框中,输入执行原因,单击确认

  3. 新增配置项。

    1. 在Kafka服务配置页面的server.properties页签,添加配置项。

    2. 单击自定义配置

    3. 新增配置项对话框中,添加以下配置。

      参数

      参数值

      sasl.mechanism.inter.broker.protocol

      SCRAM-SHA-512

      sasl.enabled.mechanisms

      SCRAM-SHA-512

      listener.name.${listener}.sasl.enabled.mechanisms

      SCRAM-SHA-512

      listener.name.${listener}.scram-sha-512.sasl.jaas.config

      org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret" ;

      说明

      ${listener}需要替换成具体的listener的名称,例如sasl_ssl。

    4. 单击确定

    5. 在弹出的对话框中,输入执行原因,单击保存

  4. 配置客户端JAAS文件内容。

    通过kafka_client_jaas.conf配置文件的kafka.client.jaas.content配置项,配置Kafka客户端JAAS,该配置将会用于启动Kafka Schema Registry以及Kafka Rest Proxy组件。

    1. 在Kafka服务配置页面,修改以下配置项。

      页签

      参数

      参数值

      kafka_client_jaas.conf

      kafka.client.jaas.content

      KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="admin-secret";
      };

      schema-registry.properties

      schema_registry_opts

      -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf

      kafka-rest.properties

      kafkarest_opts

      -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf

    2. 服务配置区域,单击保存

    3. 在弹出的对话框中,输入执行原因,单击保存

  5. 重启Kafka服务。

    1. 在Kafka服务的配置页面,选择更多操作 > 重启

    2. 在弹出的对话框中,输入执行原因,单击确定

    3. 确认对话框中,单击确定

使用Kafka Topic

创建EMR Kafka集群并进行相关安全配置等设置之后,您可以开始使用Kafka Topic进行数据的生产消费。该部分内容为您介绍如何使用Kafka自带命令操作Kafka Topic。实际业务场景,您也可以使用Kafka Manager或Cruise Control等软件管理集群。

  1. 创建客户端配置文件。

    1. 使用SSH方式登录Kafka集群,详情请参见登录集群

    2. 创建配置文件。

      1. 执行以下命令,创建配置文件client.properties

        vim client.properties
      2. 添加以下内容至配置文件client.properties中。

        bootstrap.server=core-1-1:9092
        security.protocol=SASL_SSL
        sasl.mechanism=SCRAM-SHA-512
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret";
        ssl.truststore.location=/var/taihao-security/ssl/ssl/truststore
        ssl.truststore.password=${password}
        ssl.keystore.location=/var/taihao-security/ssl/ssl/keystore
        ssl.keystore.password=${password}
        ssl.endpoint.identification.algorithm=
        说明

        上面示例文件中的部分参数,需要您根据实际情况修改。

        • usernamepassword:访问Kafka服务的用户名和密码。

        • ssl.truststore.passwordssl.keystore.password:如果Kafka集群使用默认SSL配置,则可以在EMR控制台Kafka服务的server.properties配置页面,查看ssl.truststore.passwordssl.key.password参数的参数值。

  2. 执行以下命令,创建Kafka Topic。

    sudo su - kafka
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
  3. 执行以下命令,查看Kafka topic详细信息。

    kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --topic test --describe
  4. 执行以下命令,生产数据。

     kafka-console-producer.sh  --broker-list core-1-1:9092 --producer.config client.properties --topic test
  5. 执行以下命令,消费数据。

    kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config  client.properties --topic test --from-beginning --group test-consumer-group

使用Kafka Connect服务

EMR-3.41.0之后版本、EMR-5.7.0之后版本支持Kafka Connect组件的部署。该部分内容为您介绍如何使用Kafka Connect服务。

  1. 进入节点管理页面。

    1. 登录E-MapReduce控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击目标集群操作列的节点管理

  2. 创建Kafka Connect节点组。

    Connect安装在EMR Task节点组。在EMR Kafka集群创建Task节点组后,EMR会自动在该节点组创建Kafka Connect集群。

    1. 新建EMR Task节点组

      可以在节点管理页面,单击新增节点组,新建Task节点组。具体操作,请参见新增节点组

    2. 扩容Task节点组

      根据您的实际需求,可以扩容Task机器组实例数量。具体操作,请参见扩容集群

  3. 查看KafkaConnect服务状态,确保Kafka Connect集群已经启动。

    您可以在Kafka服务的状态页面的组件列表区域,查看KafkaConnect的组件状态,确保组件在运行中。KafkaConnect

  4. 检查Kafka Connect Rest服务状态。

    1. 使用SSH方式登录Kafka集群,详情请参见登录集群

    2. 执行以下命令,检查Kafka Connect Rest服务状态。

      curl -X GET http://task-1-1:8083| jq .

      您会看到返回以下类似信息。

        % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                       Dload  Upload   Total   Spent    Left  Speed
      100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
      {
        "version": "2.4.1",
        "commit": "42ce056344c5625a",
        "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
      }
  5. 使用Kafka Connect迁移数据。

    您可以在Kafka集群中启动MirrorMaker任务,进行数据复制与迁移。具体操作,请参见使用MirrorMaker 2(on Connect)跨集群同步数据