本文为您介绍如何创建E-MapReduce(简称EMR)Kafka集群、Kafka访问的设置,使用Kafka Topic和Kafka Connect服务,帮您快速了解和上手使用EMR Kafka。
注意事项
创建EMR Kafka集群前,您需要根据业务的预估负载,选择合适的ECS实例机型以及Broker实例个数。由于业务场景差异很大,所以无法给出通用的集群规划,您需要根据您的实际环境创建集群。通常,建议您选择机型时考虑以下配置:
Broker机型的CPU和内存配比为1:4。
选择云盘作为数据存储盘。
充分考虑云盘的I/O吞吐率以及网卡带宽之间的关系。
在部署参数上,考虑以下因素:
由于EMR Kafka版本仍依赖于Zookeeper,且Zookeeper的可用性直接关系到Kafka服务的高可用,因此,建议您创建集群时,选择高可用的部署方式。启用高可用后,将创建3个节点的Zookeeper服务。
如果Master机器组只部署Zookeeper,则Master机器组只需要配置1块数据盘即可。
更详细的评估建议,请参见集群资源规格评估建议。
创建EMR Kafka集群
该部分内容为您简单介绍如何创建Kafka集群,更详细的创建操作,请参见创建集群。
进入创建集群页面。
单击上方的创建集群。
在软件配置阶段,您可以根据需要的Kafka版本,选择对应的EMR版本。
打开服务高可用开关,创建3节点的ZooKeeper集群。
重要启用高可用后,将在Master机器组上部署3个节点的Zookeeper服务。由于EMR Kafka版本的服务可用性仍依赖于Zookeeper,所以建议您创建集群时,选择高可用的部署方式。
在硬件配置阶段,选择合适的ECS实例机型以及节点数量。
机型:Core节点组选择CPU和内存配比为1 Core:4 GB的机型。
节点数量:Core节点组选择比Kafka分区副本数多1的节点数量以保持足够的冗余。例如,如果规划副本数为3,则节点数选择为4。
在基础配置阶段,根据需求填写相关参数。
在确认订单页面,选中E-MapReduce服务条款复选框,单击创建。
(可选)Kafka访问设置
默认情况下,Kafka不启用登录鉴权等安全设置。您可以根据实际情况选择是否执行以下操作启用相关配置。
启用默认SSL加密功能
您可以执行以下步骤使用默认证书配置SSL。更多SSL的配置,请参见使用SSL加密Kafka链接。
进入服务的配置页面。
在顶部菜单栏处,根据实际情况选择地域和资源组。
单击目标集群操作列的集群服务。
在集群服务页面,单击Kafka服务区域的配置。
修改配置项。
在配置页面,单击server.properties页签。
修改kafka.ssl.config.type的参数值为DEFAULT。
保存配置。
单击保存。
在弹出的对话框中,输入执行原因,单击保存。
重启Kafka服务。
在Kafka服务的配置页面,选择
。在弹出的对话框中,输入执行原因,单击确定。
在确认对话框中,单击确定。
配置SASL登录认证功能
该部分内容为您介绍如何启用SASL/SCRAM-SHA-512认证机制。
创建Kafka服务管理用户。
使用SSH方式登录Kafka集群,详情请参见登录集群。
执行以下命令,创建用户kafka。
sudo su - kafka kafka-configs.sh --bootstrap-server core-1-1:9092 --alter --add-config 'SCRAM-SHA-256=[password=kafka-secret],SCRAM-SHA-512=[password=kafka-secret]' --entity-type users --entity-name kafka
修改配置项。
在EMR控制台的集群服务页面,单击Kafka服务区域的配置。
在配置页面,单击server.properties页签。
在配置过滤中,输入配置项kafka.sasl.config.type,单击图标。
修改kafka.sasl.config.type的参数值为CUSTOM,单击保存。
在弹出的对话框中,输入执行原因,单击确认。
新增配置项。
在Kafka服务配置页面的server.properties页签,添加配置项。
单击自定义配置。
在新增配置项对话框中,添加以下配置。
参数
参数值
sasl.mechanism.inter.broker.protocol
SCRAM-SHA-512
sasl.enabled.mechanisms
SCRAM-SHA-512
listener.name.${listener}.sasl.enabled.mechanisms
SCRAM-SHA-512
listener.name.${listener}.scram-sha-512.sasl.jaas.config
org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret" ;
说明${listener}
需要替换成具体的listener的名称,例如sasl_ssl。单击确定。
在弹出的对话框中,输入执行原因,单击保存。
配置客户端JAAS文件内容。
通过kafka_client_jaas.conf配置文件的kafka.client.jaas.content配置项,配置Kafka客户端JAAS,该配置将会用于启动Kafka Schema Registry以及Kafka Rest Proxy组件。
在Kafka服务配置页面,修改以下配置项。
页签
参数
参数值
kafka_client_jaas.conf
kafka.client.jaas.content
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; };
schema-registry.properties
schema_registry_opts
-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
kafka-rest.properties
kafkarest_opts
-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
在服务配置区域,单击保存。
在弹出的对话框中,输入执行原因,单击保存。
重启Kafka服务。
在Kafka服务的配置页面,选择
。在弹出的对话框中,输入执行原因,单击确定。
在确认对话框中,单击确定。
使用Kafka Topic
创建EMR Kafka集群并进行相关安全配置等设置之后,您可以开始使用Kafka Topic进行数据的生产消费。该部分内容为您介绍如何使用Kafka自带命令操作Kafka Topic。实际业务场景,您也可以使用Kafka Manager或Cruise Control等软件管理集群。
创建客户端配置文件。
使用SSH方式登录Kafka集群,详情请参见登录集群。
创建配置文件。
执行以下命令,创建配置文件client.properties。
vim client.properties
添加以下内容至配置文件client.properties中。
bootstrap.server=core-1-1:9092 security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret"; ssl.truststore.location=/var/taihao-security/ssl/ssl/truststore ssl.truststore.password=${password} ssl.keystore.location=/var/taihao-security/ssl/ssl/keystore ssl.keystore.password=${password} ssl.endpoint.identification.algorithm=
说明上面示例文件中的部分参数,需要您根据实际情况修改。
username
和password
:访问Kafka服务的用户名和密码。ssl.truststore.password
和ssl.keystore.password
:如果Kafka集群使用默认SSL配置,则可以在EMR控制台Kafka服务的server.properties配置页面,查看ssl.truststore.password和ssl.key.password参数的参数值。
执行以下命令,创建Kafka Topic。
sudo su - kafka kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
执行以下命令,查看Kafka topic详细信息。
kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --topic test --describe
执行以下命令,生产数据。
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config client.properties --topic test
执行以下命令,消费数据。
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config client.properties --topic test --from-beginning --group test-consumer-group
使用Kafka Connect服务
EMR-3.41.0之后版本、EMR-5.7.0之后版本支持Kafka Connect组件的部署。该部分内容为您介绍如何使用Kafka Connect服务。
进入节点管理页面。
在顶部菜单栏处,根据实际情况选择地域和资源组。
单击目标集群操作列的节点管理。
创建Kafka Connect节点组。
查看KafkaConnect服务状态,确保Kafka Connect集群已经启动。
您可以在Kafka服务的状态页面的组件列表区域,查看KafkaConnect的组件状态,确保组件在运行中。
检查Kafka Connect Rest服务状态。
使用SSH方式登录Kafka集群,详情请参见登录集群。
执行以下命令,检查Kafka Connect Rest服务状态。
curl -X GET http://task-1-1:8083| jq .
您会看到返回以下类似信息。
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
使用Kafka Connect迁移数据。
您可以在Kafka集群中启动MirrorMaker任务,进行数据复制与迁移。具体操作,请参见使用MirrorMaker 2(on Connect)跨集群同步数据。