本文为您介绍如何创建E-MapReduce(简称EMR)Kafka集群、Kafka访问的设置,使用Kafka Topic和Kafka Connect服务,帮您快速了解和上手使用EMR Kafka。

创建EMR Kafka集群

该部分内容为您简单介绍如何创建Kafka集群,更详细的创建操作,请参见创建DataFlow Kafka集群

  1. 进入创建集群页面。
    1. 登录EMR on ECS控制台
    2. 单击上方的创建集群
  2. 软件配置阶段,您可以根据需要的Kafka版本,选择对应的EMR版本。
    打开服务高可用开关,创建3节点的ZooKeeper集群。软件配置
  3. 硬件配置阶段,选择合适的ECS实例机型以及节点数量。
    • 机型:Core节点组选择CPU和内存配比为1 Core:4 GB的机型。
    • 节点数量:Core节点组选择比Kafka分区副本数多1的节点数量以保持足够的冗余。例如,如果规划副本数为3,则节点数选择为4。
    硬件配置
  4. 基础配置阶段,根据需求填写相关参数。
    基础配置
  5. 确认订单页面,选中E-MapReduce服务条款复选框,单击创建

(可选)Kafka访问设置

默认情况下,Kafka不启用登录鉴权等安全设置。您可以根据实际情况选择是否执行以下操作启用相关配置。

启用默认SSL加密功能

您可以执行以下步骤使用默认证书配置SSL。更多SSL的配置,请参见使用SSL加密Kafka链接

  1. 进入服务的配置页面。
    1. 登录EMR on ECS控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击目标集群操作列的集群服务
    4. 集群服务页面,单击Kafka服务区域的配置
  2. 修改配置项。
    1. 配置页面,单击server.properties页签。
    2. 修改kafka.ssl.config.type的参数值为DEFAULT
      修改参数
  3. 保存配置。
    1. 单击保存
    2. 在弹出的对话框中,输入执行原因
    3. 单击确定
  4. 重启Kafka服务。
    1. 在Kafka服务的配置页面,选择更多操作 > 重启
    2. 在弹出的对话框中,输入执行原因,单击确定
    3. 确认对话框中,单击确定

配置SASL登录认证功能

该部分内容为您介绍如何启用SASL/SCRAM-SHA-512认证机制。

  1. 创建Kafka服务管理用户。
    1. 使用SSH方式登录Kafka集群,详情请参见登录集群
    2. 执行以下命令,创建用户kafka。
      sudo su - kafka
      kafka-configs.sh --bootstrap-server core-1-1:9092 --alter --add-config 'SCRAM-SHA-256=[password=kafka-secret],SCRAM-SHA-512=[password=kafka-secret]' --entity-type users --entity-name kafka
  2. 修改配置项。
    1. 在EMR控制台的集群服务页面,单击Kafka服务区域的配置
    2. 配置页面,单击server.properties页签。
    3. 配置过滤中,输入配置项kafka.sasl.config.type,单击Search图标。
    4. 修改kafka.sasl.config.type的参数值为CUSTOM,单击保存
    5. 在弹出的对话框中,输入执行原因,单击确认
  3. 新增配置项。
    1. 在Kafka服务配置页面的server.properties页签,添加配置项。
    2. 单击自定义配置
    3. 新增配置项对话框中,添加以下配置。
      参数参数值
      sasl.mechanism.inter.broker.protocolSCRAM-SHA-512
      sasl.enabled.mechanismsSCRAM-SHA-512
      listener.name.${listener}.sasl.enabled.mechanismsSCRAM-SHA-512
      listener.name.${listener}.scram-sha-512.sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret" ;
      说明 ${listener}需要替换成具体的listener的名称,例如sasl_ssl。
    4. 单击确定
    5. 确认修改配置对话框中,输入执行原因,打开自动更新配置开关,单击确定
  4. 配置客户端JAAS文件内容。

    通过kafka_client_jaas.conf配置文件的kafka.client.jaas.content配置项,配置Kafka客户端JAAS,该配置将会用于启动Kafka Schema Registry以及Kafka Rest Proxy组件。

    1. 在Kafka服务配置页面,修改以下配置项。
      页签参数参数值
      kafka_client_jaas.confkafka.client.jaas.content
      KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="admin-secret";
      };
      schema-registry.propertiesschema_registry_opts-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
      kafka-rest.propertieskafkarest_opts-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
    2. 服务配置区域,单击保存
    3. 确认修改配置对话框中,输入执行原因,打开自动更新配置开关,单击确定
  5. 重启Kafka服务。
    1. 在Kafka服务的配置页面,选择更多操作 > 重启
    2. 在弹出的对话框中,输入执行原因,单击确定
    3. 确认对话框中,单击确定

使用Kafka Topic

创建EMR Kafka集群并进行相关安全配置等设置之后,您可以开始使用Kafka Topic进行数据的生产消费。该部分内容为您介绍如何使用Kafka自带命令操作Kafka Topic。实际业务场景,您也可以使用Kafka Manager或Cruise Control等软件管理集群。

  1. 创建客户端配置文件。
    1. 使用SSH方式登录Kafka集群,详情请参见登录集群
    2. 创建配置文件。
      1. 执行以下命令,创建配置文件client.properties
        vim client.properties
      2. 添加以下内容至配置文件client.properties中。
        bootstrap.server=core-1-1:9092
        security.protocol=SASL_SSL
        sasl.mechanism=SCRAM-SHA-512
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret";
        ssl.truststore.location=/var/taihao-security/ssl/ssl/truststore
        ssl.truststore.password=${password}
        ssl.keystore.location=/var/taihao-security/ssl/ssl/keystore
        ssl.keystore.password=${password}
        ssl.endpoint.identification.algorithm=
        说明 上面示例文件中的部分参数,需要您根据实际情况修改。
        • usernamepassword:访问Kafka服务的用户名和密码。
        • ssl.truststore.passwordssl.keystore.password:如果Kafka集群使用默认SSL配置,则可以在EMR控制台Kafka服务的server.properties配置页面,查看ssl.truststore.passwordssl.key.password参数的参数值。
  2. 执行以下命令,创建Kafka Topic。
    sudo su - kafka
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
  3. 执行以下命令,查看Kafka topic详细信息。
    kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --topic test --describe
  4. 执行以下命令,生产数据。
     kafka-console-producer.sh  --broker-list core-1-1:9092 --producer.config client.properties --topic test
  5. 执行以下命令,消费数据。
    kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config  client.properties --topic test --from-beginning --group test-consumer-group

使用Kafka Connect服务

EMR-3.41.0之后版本、EMR-5.7.0之后版本支持Kafka Connect组件的部署。该部分内容为您介绍如何使用Kafka Connect服务。

  1. 进入节点管理页面。
    1. 登录EMR on ECS控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击目标集群操作列的节点管理
  2. 创建Kafka Connect节点组。
    Connect安装在EMR Task节点组。在EMR Kafka集群创建Task节点组后,EMR会自动在该节点组创建Kafka Connect集群。
    1. 新建EMR Task节点组

      可以在节点管理页面,单击新增节点组,新建Task节点组。具体操作,请参见新增节点组

    2. 扩容Task节点组

      根据您的实际需求,可以扩容Task机器组实例数量。具体操作,请参见扩容集群

  3. 查看KafkaConnect服务状态,确保Kafka Connect集群已经启动。
    您可以在Kafka服务的状态页面的组件列表区域,查看KafkaConnect的组件状态,确保组件在运行中。KafkaConnect
  4. 检查Kafka Connect Rest服务状态。
    1. 使用SSH方式登录Kafka集群,详情请参见登录集群
    2. 执行以下命令,检查Kafka Connect Rest服务状态。
      curl -X GET http://task-1-1:8083| jq .
      您会看到返回以下类似信息。
        % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                       Dload  Upload   Total   Spent    Left  Speed
      100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
      {
        "version": "2.4.1",
        "commit": "42ce056344c5625a",
        "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
      }
  5. 使用Kafka Connnect迁移数据。
    您可以在Kafka集群中启动MirrorMaker任务,进行数据复制与迁移。具体操作,请参见使用MirrorMaker 2(on Connect)跨集群同步数据