当E-MapReduce(简称EMR)的DataFlow集群没有安装Ranger Kafka插件,或当前集群中的Ranger Kafka插件版本与实际使用的Ranger服务不兼容时,您需要进行手动集成。本文介绍如何手动安装Ranger Kafka插件以及配置生效Ranger Kafka插件。
前提条件
已创建DataFlow集群。具体操作,请参见创建集群。
集群已启用SASL登录认证功能。开启功能的具体操作,请参见使用SASL登录认证Kafka服务。
重要配置SASL时,您需要确保Kafka服务内部通信使用的用户具有所有Resource的权限,内部组件使用的Client用户具有相应的权限(建议您内部组件Client与Kafka服务使用相同的用户)。
已创建外部Ranger服务。
已在Ranger中创建了Kafka管理用户,并且管理用户拥有所有Kafka Resource的权限。
说明建议管理用户的名称为kafka。
使用限制
本文仅适用于EMR 3.45.0及以下版本(3.x系列)的EMR集群。
由于EMR 5.x系列的Kafka不支持Ranger鉴权,所以本文档不适用于5.x系列的EMR集群。
操作步骤
本文示例中的Ranger代码包以Ranger 2.1.0为例,实际场景请您根据对应的Ranger服务以及Kafka版本选取相应Ranger版本的代码包。
建议Ranger Kafka插件选择与Ranger服务相同的版本。
通过SSH方式连接集群,详情请参见登录集群。
执行以下命令,下载Ranger插件。
wget https://dlcdn.apache.org/ranger/2.1.0/apache-ranger-2.1.0.tar.gz
您也可以从Ranger官网下载所需版本的Ranger代码包。
执行以下命令,构建Ranger Kafka插件安装包。
tar xvf apache-ranger-2.1.0.tar.gz cd apache-ranger-2.1.0 mvn clean compile package assembly:assembly install -DskipTests -Drat.skip=true cd target ls -lrt ranger-2.1.0-kafka-plugin.tar.gz
上传Ranger Kafka插件安装包到所有Kafka Broker节点的固定安装目录,本文安装根目录以
/opt/apps/ranger-plugin
为例。准备脚本文件install.properties,并将该文件放置到安装目录
/opt/apps/ranger-plugin/ranger-2.1.0-kafka-plugin
下。请您根据实际场景来配置Ranger插件的安装配置文件install.properties。
配置项说明
参数名称
描述
是否必填
COMPONENT_INSTALL_DIR_NAME
Kafka安装路径。
EMR Kafka固定安装在/opt/apps/KAFKA/kafka-current目录。
是
POLICY_MGR_URL
Ranger策略库URL。
根据Ranger服务的地址配置,格式可以参见install.properties配置文件。
是
REPOSITORY_NAME
使用的策略库名称。
根据Ranger服务的地址配置,格式可以参见install.properties配置文件。
是
XAAUDIT.SUMMARY.ENABLE
是否启用审计。默认值为true。
是
XAAUDIT.SOLR.ENABLE
是否启用Solr审计。默认值为true。
是
XAAUDIT.SOLR.URL
Solr地址。
根据实际情况填写,格式可以参见install.properties配置文件。
是
XAAUDIT.SOLR.USER
Solr访问用户。
是
XAAUDIT.SOLR.PASSWORD
Solr访问密码。
是
XAAUDIT.SOLR.ZOOKEEPER
SolrCloud ZooKeeper访问地址。
是
XAAUDIT.SOLR.FILE_SPOOL_DIR
审计日志存储路径。
根据实际情况填写,格式可以参见install.properties配置文件。
是
代码示例
以下为EMR 3.43.1版本的代码示例。
# Location of component folder COMPONENT_INSTALL_DIR_NAME=/opt/apps/KAFKA/kafka-current # # Location of Policy Manager URL # # Example: # POLICY_MGR_URL=http://policymanager.xasecure.net:6080 # 按照实际场景修改 POLICY_MGR_URL=http://master-1-1.c-590b6062db9d****.cn-hangzhou.emr.aliyuncs.com:6080 # # This is the repository name created within policy manager # # Example: # REPOSITORY_NAME=kafkadev # 按照实际场景修改 REPOSITORY_NAME=kafkadev # AUDIT configuration with V3 properties #Should audit be summarized at source XAAUDIT.SUMMARY.ENABLE=true # Enable audit logs to Solr #Example #XAAUDIT.SOLR.ENABLE=true #XAAUDIT.SOLR.URL=http://localhost:6083/solr/ranger_audits #XAAUDIT.SOLR.ZOOKEEPER= #XAAUDIT.SOLR.FILE_SPOOL_DIR=/var/log/kafka/audit/solr/spool # 按照实际场景修改 XAAUDIT.SOLR.ENABLE=true XAAUDIT.SOLR.URL=http://master-1-1.c-590b6062db9d****.cn-hangzhou.emr.aliyuncs.com:6083/solr/ranger_audits XAAUDIT.SOLR.USER=NONE XAAUDIT.SOLR.PASSWORD=NONE XAAUDIT.SOLR.ZOOKEEPER=NONE XAAUDIT.SOLR.FILE_SPOOL_DIR=/var/log/taihao-apps/kafka/audit/spool # Enable audit logs to ElasticSearch #Example #XAAUDIT.ELASTICSEARCH.ENABLE=true #XAAUDIT.ELASTICSEARCH.URL=localhost #XAAUDIT.ELASTICSEARCH.INDEX=audit XAAUDIT.ELASTICSEARCH.ENABLE=false XAAUDIT.ELASTICSEARCH.URL=NONE XAAUDIT.ELASTICSEARCH.USER=NONE XAAUDIT.ELASTICSEARCH.PASSWORD=NONE XAAUDIT.ELASTICSEARCH.INDEX=NONE XAAUDIT.ELASTICSEARCH.PORT=NONE XAAUDIT.ELASTICSEARCH.PROTOCOL=NONE # Enable audit logs to HDFS #Example #XAAUDIT.HDFS.ENABLE=true #XAAUDIT.HDFS.HDFS_DIR=hdfs://node-1.example.com:8020/ranger/audit # If using Azure Blob Storage #XAAUDIT.HDFS.HDFS_DIR=wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path> #XAAUDIT.HDFS.HDFS_DIR=wasb://ranger_audit_cont****@my-azure-account.blob.core.windows.net/ranger/audit #XAAUDIT.HDFS.FILE_SPOOL_DIR=/var/log/kafka/audit/hdfs/spool XAAUDIT.HDFS.ENABLE=false XAAUDIT.HDFS.HDFS_DIR=hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit XAAUDIT.HDFS.FILE_SPOOL_DIR=/var/log/kafka/audit/hdfs/spool # Following additional propertis are needed When auditing to Azure Blob Storage via HDFS # Get these values from your /etc/hadoop/conf/core-site.xml #XAAUDIT.HDFS.HDFS_DIR=wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path> XAAUDIT.HDFS.AZURE_ACCOUNTNAME=__REPLACE_AZURE_ACCOUNT_NAME XAAUDIT.HDFS.AZURE_ACCOUNTKEY=__REPLACE_AZURE_ACCOUNT_KEY XAAUDIT.HDFS.AZURE_SHELL_KEY_PROVIDER=__REPLACE_AZURE_SHELL_KEY_PROVIDER XAAUDIT.HDFS.AZURE_ACCOUNTKEY_PROVIDER=__REPLACE_AZURE_ACCOUNT_KEY_PROVIDER #Log4j Audit Provider XAAUDIT.LOG4J.ENABLE=false XAAUDIT.LOG4J.IS_ASYNC=false XAAUDIT.LOG4J.ASYNC.MAX.QUEUE.SIZE=10240 XAAUDIT.LOG4J.ASYNC.MAX.FLUSH.INTERVAL.MS=30000 XAAUDIT.LOG4J.DESTINATION.LOG4J=true XAAUDIT.LOG4J.DESTINATION.LOG4J.LOGGER=xaaudit # End of V3 properties # # Audit to HDFS Configuration # # If XAAUDIT.HDFS.IS_ENABLED is set to true, please replace tokens # that start with __REPLACE__ with appropriate values # XAAUDIT.HDFS.IS_ENABLED=true # XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit/%app-type%/%time:yyyyMMdd% # XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit # XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit/archive # # Example: # XAAUDIT.HDFS.IS_ENABLED=true # XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://namenode.example.com:8020/ranger/audit/%app-type%/%time:yyyyMMdd% # XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=/var/log/kafka/audit # XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=/var/log/kafka/audit/archive # XAAUDIT.HDFS.IS_ENABLED=false XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit/%app-type%/%time:yyyyMMdd% XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit/archive XAAUDIT.HDFS.DESTINTATION_FILE=%hostname%-audit.log XAAUDIT.HDFS.DESTINTATION_FLUSH_INTERVAL_SECONDS=900 XAAUDIT.HDFS.DESTINTATION_ROLLOVER_INTERVAL_SECONDS=86400 XAAUDIT.HDFS.DESTINTATION_OPEN_RETRY_INTERVAL_SECONDS=60 XAAUDIT.HDFS.LOCAL_BUFFER_FILE=%time:yyyyMMdd-HHmm.ss%.log XAAUDIT.HDFS.LOCAL_BUFFER_FLUSH_INTERVAL_SECONDS=60 XAAUDIT.HDFS.LOCAL_BUFFER_ROLLOVER_INTERVAL_SECONDS=600 XAAUDIT.HDFS.LOCAL_ARCHIVE_MAX_FILE_COUNT=10 #Solr Audit Provider XAAUDIT.SOLR.IS_ENABLED=false XAAUDIT.SOLR.MAX_QUEUE_SIZE=1 XAAUDIT.SOLR.MAX_FLUSH_INTERVAL_MS=1000 XAAUDIT.SOLR.SOLR_URL=http://localhost:6083/solr/ranger_audits # End of V2 properties # # SSL Client Certificate Information # # Example: # SSL_KEYSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-keystore.jks # SSL_KEYSTORE_PASSWORD=none # SSL_TRUSTSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-truststore.jks # SSL_TRUSTSTORE_PASSWORD=none # # You do not need use SSL between agent and security admin tool, please leave these sample value as it is. # SSL_KEYSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-keystore.jks SSL_KEYSTORE_PASSWORD=myKeyFilePassword SSL_TRUSTSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-truststore.jks SSL_TRUSTSTORE_PASSWORD=changeit # # Custom component user # CUSTOM_COMPONENT_USER=<custom-user> # keep blank if component user is default CUSTOM_USER=kafka # # Custom component group # CUSTOM_COMPONENT_GROUP=<custom-group> # keep blank if component group is default CUSTOM_GROUP=hadoop
执行以下命令,安装Ranger Kafka插件。
sudo su - root cd /opt/apps/ranger-plugin/ranger-2.1.0-kafka-plugin ./enable-kafka-plugin.sh ./install.properties
在EMR控制台修改Kafka的配置文件server.properties。
在EMR控制台Kafka服务的配置页签。
单击server.properties页签。
修改以下参数信息。
kafka_server_start_cmd_addition_args:在参数值中追加内容CLASSPATH=$CLASSPATH:/opt/apps/KAFKA/kafka-current/config。
说明如果EMR控制台Kafka服务不支持kafka_server_start_cmd_addition_args配置,则无法使用该方式修改CLASSPATH变量,此时您可以在Kafka Broker实例执行以下命令达到添加Ranger Kafka插件配置文件的目的。
cd /opt/apps/KAFKA/kafka-current/libs sudo ln -s /opt/apps/KAFKA/kafka-current/config kafka-conf
authorizer.class.name:修改参数值为org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer。
修改完后,保存配置。
单击保存。
在弹出的对话框中,输入执行原因,单击保存。
在EMR控制台重启Kafka Broker服务。
在EMR控制台Kafka服务的状态页面,单击KafkaBroker操作列的重启。
在弹出的对话框中,输入执行原因,单击确定。
在弹出的确认对话框中,单击确定。
相关文档
除了使用本文的手动集成Ranger Kafka插件的方式,您还可以根据实际情况决定是否使用EMR的脚本操作功能来批量安装部署Ranger Kafka插件。具体操作,请参见手动执行脚本。
如果您的集群是EMR 3.45.0及以上版本,则可以在EMR控制台直接开启Ranger权限控制,详情请参见配置Kafka开启Ranger权限控制。