文档

使用LDAP用户鉴权

更新时间:

本文为您介绍EMR Kafka如何启用LDAP用户进行鉴权,以及详细使用的示例。

前提条件

已创建DataFlow集群,且选择了Kafka和OpenLDAP服务。创建集群详情,请参见创建DataFlow Kafka集群

使用限制

  • 适用于EMR-3.44.0之后版本、EMR-5.10.0之后版本的EMR Kafka集群。

  • 集群中已有EMR OpenLDAP服务或者外部LDAP服务。

    本文以EMR OpenLDAP服务为例。

注意事项

如果需要使用用户组鉴权,则需要确保LDAP服务启用memberOf overlay特性,或者LDAP用户支持memberOf属性设置。

启用LDAP用户鉴权

  1. 创建super users。

    说明

    如果您已经创建好super user,则可以跳过此步骤。

    Kafka super user可以访问Kafka的所有资源,本文super user用户用于Broker节点以及组件之间的相互访问。本文以EMR OpenLDAP服务为例,添加Kafka super user用户。

    1. 通过SSH方式连接集群master-1-1节点,详情请参见登录集群

    2. 创建kafka.ldif文件,文件内容如下。

      本文是通过ldap命令,添加uid为kafka,密码为kafka-secret的LDAP用户。

      dn: uid=kafka,ou=people,o=emr
      cn: kafka
      sn: kafka
      objectClass: inetOrgPerson
      userPassword: kafka-secret
      uid: kafka
    3. 执行以下命令,添加LDAP用户。

      ldapadd -H ldap://master-1-1:10389 -f kafka.ldif -D ${uid} -w ${rootDnPW}
      说明
      • ${uid}:为EMR控制台OpenLDAP服务配置页面admin_dn的参数值。

      • ${rootDnPW}:为EMR控制台OpenLDAP服务配置页面admin_pwd的参数值。

      • 10389:为OpenLDAP服务的监听端口。

      添加成功后,您可以执行以下命令,查看该LDAP用户信息。

      ldapsearch -w ${rootDnPW}  -D ${uid} -H ldap://master-1-1:10389 -b uid=kafka,ou=people,o=emr
  2. 进入Kafka服务的配置页面。

    1. 登录EMR on ECS控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击目标集群操作列的集群服务

    4. 集群服务页面,单击Kafka服务区域的配置

  3. 修改配置项并保存。

    1. 配置页面,单击server.properties页签。

    2. 修改配置项kafka.ssl.config.type的值为CUSTOM

    3. 修改配置项authorizer.class.name的值为kafka.security.ldap.authorizer.SimpleLdapAuthorizer

    4. 保存配置项。

      1. 单击保存

      2. 在弹出的对话框中,输入执行原因

  4. 新增自定义配置项并保存。

    1. server.properties页签,单击新增配置项

    2. 在新增配置项对话框中,添加以下配置。

      参数

      参数值

      说明

      super.users

      User:kafka

      kafka为已创建好的用户名,请根据实际情况替换。

      创建用户详情,请参见步骤1

      listener.name.${listener}.sasl.enabled.mechanisms

      PLAIN

      ${listener}需要替换成具体的listener的名称,例如sasl_ssl。

      listener.name.${listener}.plain.sasl.jaas.config

      org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret" emr.kafka.security.ldap.user.name.attribute="uid" emr.kafka.security.ldap.user.base.dn="ou=people,o=emr" emr.kafka.security.ldap.group.name.attribute="cn" emr.kafka.security.ldap.group.base.dn="ou=groups,o=emr" emr.kafka.security.ldap.admin.name.attribute="uid" emr.kafka.security.ldap.admin.base.dn="o=emr" emr.kafka.security.ldap.url="ldaps://master-1-1:10636" emr.kafka.security.ldap.bind.user="admin" emr.kafka.security.ldap.bind.user.password="WMMuhh3P**********" emr.kafka.security.ldap.user.member.of.attribute="memberOf" emr.kafka.security.ldap.group.authorization.support="true" ;

      ${listener}需要替换成具体的listener的名称,例如sasl_ssl。

      参数值也应根据实际情况替换LDAP的相关配置:

      • username:需要使用super user,Kafka服务使用该用户进行节点间访问。

      • mr.kafka.security.ldap.user.name.attribute:LDAP用户名称属性,用于获取用户名。

      • emr.kafka.security.ldap.user.base.dn:LDAP用户base dn。

      • emr.kafka.security.ldap.group.name.attribute:LDAP用户组名称属性,用于获取用户组名称。

      • emr.kafka.security.ldap.group.base.dn:LDAP用户组base dn。

      • emr.kafka.security.ldap.admin.name.attribute:LDAP admin用户属性。

      • emr.kafka.security.ldap.admin.base.dn:LDAP admin用户base dn。

      • emr.kafka.security.ldap.url:LDAP URL。

      • emr.kafka.security.ldap.bind.use:LDAP管理员用户名称,用于用户组鉴权使用。

      • emr.kafka.security.ldap.bind.user.password:LDAP管理员用户密码。

      • emr.kafka.security.ldap.user.member.of.attribute:用户所属组的属性,用于获取用户所属组。

      • emr.kafka.security.ldap.group.authorization.support:是否支持组鉴权。如果支持,则当用户所属组有权限时,该用户继承组的权限。

      listener.name.${listener}.plain.sasl.server.callback.handler.class

      kafka.security.ldap.authenticator.LdapAuthenticateCallbackHandler

      ${listener}需要替换成具体的listener的名称,例如sasl_ssl。

      sasl.mechanism.inter.broker.protocol

      PLAIN

      sasl.enabled.mechanisms

      PLAIN

    3. 修改其他配置项。

      您需要根据业务需求,修改以下配置。

      说明

      需要根据实际情况替换kafka.client.jaas.content参数值中的用户名和密码,建议使用super user用户。

      配置文件

      参数

      参数值

      说明

      kafka_client_jaas.conf

      kafka.client.jaas.content

      KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule  required username="kafka" password="kafka-secret"; };

      末尾需要有英文分号(;)。

      schema-registry.properties

      schema_registry_opts

      -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf

      如果该配置已经有参数值,则将新增的内容追加到已有的参数值之后。

      kafka-rest.properties

      kafkarest_opts

      -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf

      如果该配置已经有参数值,则将新增的内容追加到已有的参数值之后。

  5. 重启Kafka服务。

    1. 在Kafka服务的配置页面,选择更多操作 > 重启

    2. 在弹出的对话框中,输入执行原因,单击确定

    3. 确认对话框中,单击确定

使用示例

本示例操作适用于开源Kafka 2.4.1,其它版本的Kafka操作请参见Apache Kafka

  1. 创建测试用户以及用户组。

    1. 如果OpenLdap没有开启memberOf overlay特性,则需要开启此功能。

      说明
      • 以下配置方式只作为参考,不同的LDAP服务,开启memberOf overlay的方式不同。

      • 需要在所有OpenLDAP节点下配置生效。

      • 以下所有配置文件的参数请根据实际情况修改,例如:

        • olcModulepath:32位OS系统为/usr/lib/openldap。

        • dn: cn=module{0},cn=config:如果/etc/openldap/slapd.d/cn=config目录下已经存在cn=module{0}.ldif文件,则需要修改module后面的数字为1;如果存在cn=module{1}.ldif,则需要修改module后面的数字为2,以此类推。

        • dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config:根据/etc/openldap/slapd.d/cn=config目录olcDatabase的值来配置相应的值。例如,如果存在olcDatabase={2}hdb.ldif,则此处的值应为hdb。

      1. 执行命令vim memberof_config.ldif,新增配置文件memberof_config.ldif,文件内容如下。

        dn: cn=module{0},cn=config
        cn: module{0}
        objectClass: olcModuleList
        objectclass: top
        olcModuleload: memberof.la
        olcModulePath: /usr/lib64/openldap
        
        dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config
        objectClass: olcConfig
        objectClass: olcMemberOf
        objectClass: olcOverlayConfig
        objectClass: top
        olcOverlay: memberof
        olcMemberOfDangling: ignore
        olcMemberOfRefInt: TRUE
        olcMemberOfGroupOC: groupOfNames
        olcMemberOfMemberAD: member
        olcMemberOfMemberOfAD: memberOf
      2. 执行命令vim refint1.ldif,新增配置文件refint1.ldif,文件内容如下。

        dn: cn=module{0},cn=config
        add: olcmoduleload
        olcmoduleload: refint
      3. 执行命令vim refint2.ldif,新增配置文件refint2.ldif,文件内容如下。

        dn: olcOverlay=refint,olcDatabase={2}hdb,cn=config
        objectClass: olcConfig
        objectClass: olcOverlayConfig
        objectClass: olcRefintConfig
        objectClass: top
        olcOverlay: refint
        olcRefintAttribute: memberof member manager owner
      4. 执行以下命令,加载配置文件。

        ldapadd -Q -Y EXTERNAL -H ldapi:/// -f memberof_config.ldif
        ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f refint1.ldif
        ldapadd -Q -Y EXTERNAL -H ldapi:/// -f refint2.ldif
    2. 创建测试用户。

      通过ldapadd命令的方式添加测试用户。ldif文件命名为kafka-users.ldif,文件内容如下。

      dn: uid=kafka-user1,ou=people,o=emr
      cn: kafka-user1
      sn: kafka-user1
      uid: kafka-user1
      objectClass: inetOrgPerson
      userPassword: kafka-secret
      
      dn: uid=kafka-user2,ou=people,o=emr
      cn: kafka-user2
      sn: kafka-user2
      uid: kafka-user2
      objectClass: inetOrgPerson
      userPassword: kafka-secret
    3. 创建测试用户组。

      通过ldapadd命令的方式添加测试用户组。ldif文件命名为kafka-groups.ldif,文件内容如下。

      dn: cn=kafka-group1,ou=groups,o=emr
      cn: kafka-group1
      objectClass: groupOfNames
      member: uid=kafka-user1,ou=people,o=emr
      member: uid=kafka-user2,ou=people,o=emr
      
      dn: cn=kafka-group2,ou=groups,o=emr
      cn: kafka-group2
      objectClass: groupOfNames
      member: uid=kafka-user1,ou=people,o=emr
    4. 在LDAP所在服务器执行以下命令,查看测试用户memberOf属性。

      ldapsearch -Q -Y EXTERNAL -H ldapi:///  -b ou=people,o=emr memberOf

      根据返回信息,可以看见kafka-user1的memberOf属性包含kafka-group1和kafka-group2,kafka-user2的memberOf属性包含kafka-group2。

  2. 创建客户端配置文件。

    1. 创建client.properties客户端配置文件,内容如下所示。

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      #根据不同的客户端身份,替换username以及password。
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required username="kafka" password="kafka-secret";
    2. 创建kafka-user1.properties客户端配置文件,内容如下所示。

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      #根据不同的客户端身份,替换username以及password
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required username="kafka-user1" password="kafka-secret";
    3. 创建kafka-user2.properties客户端配置文件,内容如下所示。

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      #根据不同的客户端身份,替换username以及password
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required username="kafka-user2" password="kafka-secret";
  3. 创建名称为test的测试Topic。

    kafka-topics.sh  --bootstrap-server core-1-1:9092 --command-config  client.properties --create --topic test --replication-factor 3 --partitions 2
  4. 授权操作。

    详细的授权命令操作,请参见Authorization and ACLs

    #kafka-group2权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --cluster kafka-cluster
    kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --topic test
    #kafka-group1权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-group1 --deny-host "*" --operation All --topic test
    #kafka-user1权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-user1 --deny-host "*" --operation Read --topic test
    #kafka-user2权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation Read --topic test

    您可以执行以下命令,查看授权结果。

    kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --list --topic test

    返回信息如下所示。

    Current ACLs for resource `Topic:LITERAL:test`:
            User:kafka-group2 has Allow permission for operations: All from hosts: *
            User:kafka-user2 has Allow permission for operations: Read from hosts: *
            User:kafka-user1 has Deny permission for operations: Read from hosts: *
            User:kafka-group1 has Deny permission for operations: All from hosts: *
  5. 权限验证。

    • kafka-user1通过kafka-group2可以写test。

      kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user1.properties  --topic test

      在提示符后输入测试消息,可以访问test。

      #提示符后输入测试消息
      >a
      >b
      >c
      >d
    • kafka-user1不能读test。

      #授权consumer group
      kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user1 --allow-host "*" --operation All --group kka-user1-consumer
      kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user1.properties --topic test --group kafka-user1-consumer

      输出结果提示鉴权不通过无法访问test。

    • kafka-user2不能写test。

      kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user2.properties  --topic test
      #提示符后输入测试消息
      >a
      #提示鉴权不通过

      输出结果提示鉴权不通过。

    • kafka-user2可以读test。

      #授权consumer group
      kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation All --group kafka-user2-consumer
      kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user2.properties --topic test --group kafka-user2-consumer --from-beginning
      #正常消费数据