本文为您介绍EMR Kafka如何启用LDAP用户进行鉴权,以及详细使用的示例。
前提条件
已创建DataFlow集群,且选择了Kafka和OpenLDAP服务。创建集群详情,请参见创建DataFlow Kafka集群。
使用限制
适用于EMR-3.44.0之后版本、EMR-5.10.0之后版本的EMR Kafka集群。
集群中已有EMR OpenLDAP服务或者外部LDAP服务。
本文以EMR OpenLDAP服务为例。
注意事项
如果需要使用用户组鉴权,则需要确保LDAP服务启用memberOf overlay特性,或者LDAP用户支持memberOf属性设置。
启用LDAP用户鉴权
创建super users。
说明如果您已经创建好super user,则可以跳过此步骤。
Kafka super user可以访问Kafka的所有资源,本文super user用户用于Broker节点以及组件之间的相互访问。本文以EMR OpenLDAP服务为例,添加Kafka super user用户。
通过SSH方式连接集群master-1-1节点,详情请参见登录集群。
创建kafka.ldif文件,文件内容如下。
本文是通过
ldap
命令,添加uid为kafka,密码为kafka-secret的LDAP用户。dn: uid=kafka,ou=people,o=emr cn: kafka sn: kafka objectClass: inetOrgPerson userPassword: kafka-secret uid: kafka
执行以下命令,添加LDAP用户。
ldapadd -H ldap://master-1-1:10389 -f kafka.ldif -D ${uid} -w ${rootDnPW}
说明${uid}
:为EMR控制台OpenLDAP服务配置页面admin_dn的参数值。${rootDnPW}
:为EMR控制台OpenLDAP服务配置页面admin_pwd的参数值。10389
:为OpenLDAP服务的监听端口。
添加成功后,您可以执行以下命令,查看该LDAP用户信息。
ldapsearch -w ${rootDnPW} -D ${uid} -H ldap://master-1-1:10389 -b uid=kafka,ou=people,o=emr
进入Kafka服务的配置页面。
在顶部菜单栏处,根据实际情况选择地域和资源组。
单击目标集群操作列的集群服务。
在集群服务页面,单击Kafka服务区域的配置。
修改配置项并保存。
在配置页面,单击server.properties页签。
修改配置项kafka.ssl.config.type的值为CUSTOM。
修改配置项authorizer.class.name的值为kafka.security.ldap.authorizer.SimpleLdapAuthorizer。
保存配置项。
单击保存。
在弹出的对话框中,输入执行原因。
新增自定义配置项并保存。
在server.properties页签,单击新增配置项。
在新增配置项对话框中,添加以下配置。
参数
参数值
说明
super.users
User:kafka
kafka
为已创建好的用户名,请根据实际情况替换。创建用户详情,请参见步骤1。
listener.name.${listener}.sasl.enabled.mechanisms
PLAIN
${listener}需要替换成具体的listener的名称,例如sasl_ssl。
listener.name.${listener}.plain.sasl.jaas.config
org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret" emr.kafka.security.ldap.user.name.attribute="uid" emr.kafka.security.ldap.user.base.dn="ou=people,o=emr" emr.kafka.security.ldap.group.name.attribute="cn" emr.kafka.security.ldap.group.base.dn="ou=groups,o=emr" emr.kafka.security.ldap.admin.name.attribute="uid" emr.kafka.security.ldap.admin.base.dn="o=emr" emr.kafka.security.ldap.url="ldaps://master-1-1:10636" emr.kafka.security.ldap.bind.user="admin" emr.kafka.security.ldap.bind.user.password="WMMuhh3P**********" emr.kafka.security.ldap.user.member.of.attribute="memberOf" emr.kafka.security.ldap.group.authorization.support="true" ;
${listener}需要替换成具体的listener的名称,例如sasl_ssl。
参数值也应根据实际情况替换LDAP的相关配置:
username
:需要使用super user,Kafka服务使用该用户进行节点间访问。mr.kafka.security.ldap.user.name.attribute
:LDAP用户名称属性,用于获取用户名。emr.kafka.security.ldap.user.base.dn
:LDAP用户base dn。emr.kafka.security.ldap.group.name.attribute:LDAP用户组名称属性,用于获取用户组名称。
emr.kafka.security.ldap.group.base.dn
:LDAP用户组base dn。emr.kafka.security.ldap.admin.name.attribute
:LDAP admin用户属性。emr.kafka.security.ldap.admin.base.dn
:LDAP admin用户base dn。emr.kafka.security.ldap.url
:LDAP URL。emr.kafka.security.ldap.bind.use
:LDAP管理员用户名称,用于用户组鉴权使用。emr.kafka.security.ldap.bind.user.password
:LDAP管理员用户密码。emr.kafka.security.ldap.user.member.of.attribute
:用户所属组的属性,用于获取用户所属组。emr.kafka.security.ldap.group.authorization.support
:是否支持组鉴权。如果支持,则当用户所属组有权限时,该用户继承组的权限。
listener.name.${listener}.plain.sasl.server.callback.handler.class
kafka.security.ldap.authenticator.LdapAuthenticateCallbackHandler
${listener}需要替换成具体的listener的名称,例如sasl_ssl。
sasl.mechanism.inter.broker.protocol
PLAIN
无
sasl.enabled.mechanisms
PLAIN
无
修改其他配置项。
您需要根据业务需求,修改以下配置。
说明需要根据实际情况替换kafka.client.jaas.content参数值中的用户名和密码,建议使用super user用户。
配置文件
参数
参数值
说明
kafka_client_jaas.conf
kafka.client.jaas.content
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret"; };
末尾需要有英文分号(;)。
schema-registry.properties
schema_registry_opts
-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
如果该配置已经有参数值,则将新增的内容追加到已有的参数值之后。
kafka-rest.properties
kafkarest_opts
-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
如果该配置已经有参数值,则将新增的内容追加到已有的参数值之后。
重启Kafka服务。
在Kafka服务的配置页面,选择 。
在弹出的对话框中,输入执行原因,单击确定。
在确认对话框中,单击确定。
使用示例
本示例操作适用于开源Kafka 2.4.1,其它版本的Kafka操作请参见Apache Kafka。
创建测试用户以及用户组。
如果OpenLdap没有开启memberOf overlay特性,则需要开启此功能。
说明以下配置方式只作为参考,不同的LDAP服务,开启memberOf overlay的方式不同。
需要在所有OpenLDAP节点下配置生效。
以下所有配置文件的参数请根据实际情况修改,例如:
olcModulepath
:32位OS系统为/usr/lib/openldap。dn: cn=module{0},cn=config
:如果/etc/openldap/slapd.d/cn=config目录下已经存在cn=module{0}.ldif文件,则需要修改module后面的数字为1;如果存在cn=module{1}.ldif,则需要修改module后面的数字为2,以此类推。dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config
:根据/etc/openldap/slapd.d/cn=config目录olcDatabase的值来配置相应的值。例如,如果存在olcDatabase={2}hdb.ldif
,则此处的值应为hdb。
执行命令
vim memberof_config.ldif
,新增配置文件memberof_config.ldif,文件内容如下。dn: cn=module{0},cn=config cn: module{0} objectClass: olcModuleList objectclass: top olcModuleload: memberof.la olcModulePath: /usr/lib64/openldap dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config objectClass: olcConfig objectClass: olcMemberOf objectClass: olcOverlayConfig objectClass: top olcOverlay: memberof olcMemberOfDangling: ignore olcMemberOfRefInt: TRUE olcMemberOfGroupOC: groupOfNames olcMemberOfMemberAD: member olcMemberOfMemberOfAD: memberOf
执行命令
vim refint1.ldif
,新增配置文件refint1.ldif,文件内容如下。dn: cn=module{0},cn=config add: olcmoduleload olcmoduleload: refint
执行命令
vim refint2.ldif
,新增配置文件refint2.ldif,文件内容如下。dn: olcOverlay=refint,olcDatabase={2}hdb,cn=config objectClass: olcConfig objectClass: olcOverlayConfig objectClass: olcRefintConfig objectClass: top olcOverlay: refint olcRefintAttribute: memberof member manager owner
执行以下命令,加载配置文件。
ldapadd -Q -Y EXTERNAL -H ldapi:/// -f memberof_config.ldif ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f refint1.ldif ldapadd -Q -Y EXTERNAL -H ldapi:/// -f refint2.ldif
创建测试用户。
通过ldapadd命令的方式添加测试用户。ldif文件命名为kafka-users.ldif,文件内容如下。
dn: uid=kafka-user1,ou=people,o=emr cn: kafka-user1 sn: kafka-user1 uid: kafka-user1 objectClass: inetOrgPerson userPassword: kafka-secret dn: uid=kafka-user2,ou=people,o=emr cn: kafka-user2 sn: kafka-user2 uid: kafka-user2 objectClass: inetOrgPerson userPassword: kafka-secret
创建测试用户组。
通过ldapadd命令的方式添加测试用户组。ldif文件命名为kafka-groups.ldif,文件内容如下。
dn: cn=kafka-group1,ou=groups,o=emr cn: kafka-group1 objectClass: groupOfNames member: uid=kafka-user1,ou=people,o=emr member: uid=kafka-user2,ou=people,o=emr dn: cn=kafka-group2,ou=groups,o=emr cn: kafka-group2 objectClass: groupOfNames member: uid=kafka-user1,ou=people,o=emr
在LDAP所在服务器执行以下命令,查看测试用户memberOf属性。
ldapsearch -Q -Y EXTERNAL -H ldapi:/// -b ou=people,o=emr memberOf
根据返回信息,可以看见kafka-user1的memberOf属性包含kafka-group1和kafka-group2,kafka-user2的memberOf属性包含kafka-group2。
创建客户端配置文件。
创建client.properties客户端配置文件,内容如下所示。
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN #根据不同的客户端身份,替换username以及password。 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret";
创建kafka-user1.properties客户端配置文件,内容如下所示。
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN #根据不同的客户端身份,替换username以及password sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user1" password="kafka-secret";
创建kafka-user2.properties客户端配置文件,内容如下所示。
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN #根据不同的客户端身份,替换username以及password sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user2" password="kafka-secret";
创建名称为test的测试Topic。
kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --create --topic test --replication-factor 3 --partitions 2
授权操作。
详细的授权命令操作,请参见Authorization and ACLs。
#kafka-group2权限 kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --cluster kafka-cluster kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --topic test #kafka-group1权限 kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-group1 --deny-host "*" --operation All --topic test #kafka-user1权限 kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-user1 --deny-host "*" --operation Read --topic test #kafka-user2权限 kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation Read --topic test
您可以执行以下命令,查看授权结果。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --list --topic test
返回信息如下所示。
Current ACLs for resource `Topic:LITERAL:test`: User:kafka-group2 has Allow permission for operations: All from hosts: * User:kafka-user2 has Allow permission for operations: Read from hosts: * User:kafka-user1 has Deny permission for operations: Read from hosts: * User:kafka-group1 has Deny permission for operations: All from hosts: *
权限验证。
kafka-user1通过kafka-group2可以写test。
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user1.properties --topic test
在提示符后输入测试消息,可以访问test。
#提示符后输入测试消息 >a >b >c >d
kafka-user1不能读test。
#授权consumer group kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user1 --allow-host "*" --operation All --group kka-user1-consumer kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user1.properties --topic test --group kafka-user1-consumer
输出结果提示鉴权不通过无法访问test。
kafka-user2不能写test。
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user2.properties --topic test #提示符后输入测试消息 >a #提示鉴权不通过
输出结果提示鉴权不通过。
kafka-user2可以读test。
#授权consumer group kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation All --group kafka-user2-consumer kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user2.properties --topic test --group kafka-user2-consumer --from-beginning #正常消费数据