如果没有开启Kafka认证(如 Kerberos 认证或者简单的用户名密码),即使开启了Kafka授权,用户也可以伪造身份访问服务。所以建议创建高安全模式,即支持Kerberos的Kafka集群。
背景信息
本文的权限配置只针对E-MapReduce的高安全模式集群,即Kafka以Kerberos的方式启动,详见Kerberos简介。
进入配置页面
- 登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,选择地域(Region)。
- 单击上方的集群管理。
- 在集群管理页面,单击相应集群所在行的详情。
- 在左侧导航栏单击。
- 单击配置页签。
添加配置
- 在服务配置区域,单击server.properties。
- 单击自定义配置,添加如下几个参数。
key |
value |
备注 |
authorizer.class.name |
kafka.security.auth.SimpleAclAuthorizer |
无。 |
super.users |
User:kafka |
User:kafka是必须的,可添加其它用户用分号(;)隔开。 |
说明 zookeeper.set.acl用来设置Kafka在ZooKeeper中数据的权限,E-MapReduce集群中已经设置为true,所以此处不需要再添加该配置。该配置设置为true后,在Kerberos环境中,只有用户名称为Kafka且通过Kerberos
认证后才能执行kafka-topics.sh命令(kafka-topics.sh会直接读写/修改ZooKeeper中的数据)。
重启Kafka服务
- 在页面,单击右上角的。
- 在执行集群操作对话框设置相关参数,然后单击确定。
单击右上角查看操作历史查看任务进度,等待任务完成。
授权(ACL)
- 基本概念
Kafka官方文档定义。
Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"
即ACL过程涉及Principal、Allowed/Denied、Operation、Host 和 Resource。
- Principal:用户名
安全协议 |
value |
PLAINTEXT |
ANONYMOUS |
SSL |
ANONYMOUS |
SASL_PLAINTEXT |
mechanism 为 PLAIN 时,用户名是 client_jaas.conf 指定的用户名,mechanism 为 GSSAPI 时,用户名为 client_jaas.conf
指定的 principal
|
SASL_SSL |
- |
- Allowed/Denied:允许/拒绝。
- Operation: 操作,包括 Read、Write、Create、DeleteAlter、Describe、ClusterAction、AlterConfigs、DescribeConfigs、IdempotentWrite和All。
- Host: 针对的机器。
- Resource: 权限作用的资源对象,包括 Topic、Group、Cluster 和 TransactionalId。
Operation/Resource的一些详细对应关系,如哪些Resource支持哪些Operation的授权,详见KIP-11 - Authorization Interface。
- 授权命令
我们使用脚本kafka-acls.sh (/usr/lib/kafka-current/bin/kafka-acls.sh) 进行Kafka授权,您可以直接执行 kafka-acls.sh --help
查看如何使用该命令。
操作示例
在已经创建的E-MapReduce高安全Kafka集群的master节点上进行相关示例操作。
- 新建用户test,执行以下命令。
useradd test
- 创建topic。
第一节添加配置的备注中提到zookeeper.set.acl=true,kafka-topics.sh需要在kafka账号下执行,而且kafka账号下要通过Kerberos认证。
# kafka_client_jaas.conf中已经设置了kafka的Kerberos认证相关信息
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
# zookeeper地址改成自己集群的对应地址(执行`hostnamed`后即可获取)
kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test
- test用户执行kafka-console-producer.sh。
- 创建test用户的keytab 文件,用于 zookeeper/kafka的认证。
su root
sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab
HadminLocalTool.local: #直接按回车可以看到一些命令的用法
HadminLocalTool.local: addprinc #输入命令按回车可以看到具体命令的用法
HadminLocalTool.local: addprinc -pw 123456 test #添加test的princippal,密码设置为123456
HadminLocalTool.local: ktadd -k /home/test/test.keytab test #导出keytab文件,后续可使用该文件
- 添加kafka_client_test.conf。
如文件放到 /home/test/kafka_client_test.conf,内容如下。
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/test/test.keytab"
principal="test";
};
// Zookeeper client authentication
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=false
serviceName="zookeeper"
keyTab="/home/test/test.keytab"
principal="test";
};
- 添加producer.conf。
如文件放到/home/test/producer.conf
,内容如下。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
- 执行kafka-console-producer.sh。
su test
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092
由于没有设置ACL,所以上述会报错。
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
- 设置 ACL。
同样kafka-acls.sh
也需要kafka账号执行。
su kafka
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
- 再执行kafka-console-producer.sh。
su test
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092
正常情况下显示如下。
[2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
>alibaba
>E-MapReduce
>
- est用户执行
kafka-console-consumer.sh
。
上面成功执行kafka-console-producer.sh,并往 topic 里面写入一些数据后,就可以执行kafka-console-consumer.sh
进行消费测试。
- 添加consumer.conf。
如文件放到/home/test/consumer.conf,内容如下。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
- 执行kafka-console-consumer.sh。
su test
#kafka_client_test.conf跟上面producer使用的是一样的
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
kafka-console-consumer.sh --consumer.config consumer.conf --topic test --bootstrap-server emr-worker-1:9092 --group test-group --from-beginning
由于未设置权限,会报以下错误。
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
- 设置ACL。
su kafka
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
# test-group权限
kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group
# topic权限
kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
- 再执行
kafka-console-consumer.sh
。
su test
# kafka_client_test.conf跟上面producer使用的是一样的
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
kafka-console-consumer.sh --consumer.config consumer.conf --topic test --bootstrap-server emr-worker-1:9092 --group test-group --from-beginning
正常输出。
alibaba
E-MapReduce
在文档使用中是否遇到以下问题
更多建议
匿名提交