E-MapReduce(简称EMR) 从EMR-3.16.0版本开始支持Apache Flume。本文介绍如何使用Flume将数据从EMR Kafka集群同步至EMR Hadoop集群的HDFS、Hive、HBase以及阿里云OSS。
前提条件
- 已开通OSS服务,详情请参见开通OSS服务。
- 已创建Hadoop集群,并且选择了Flume和HBase服务,详情请参见创建集群。
说明 Flume软件安装目录在/usr/lib/flume-current下,其他常用文件路径获取方式请参见常用文件路径。
- 已创建Kafka集群,详情请参见创建集群。
说明
- 如果创建的是Hadoop高安全集群,消费标准Kafka集群的数据,在Hadoop集群配置Kerberos认证,详情请参见兼容MIT Kerberos认证。
- 如果创建的是Kafka高安全集群,通过Flume将数据写入标准Hadoop集群,请参见 Kerberos Kafka Source。
- 如果创建的Hadoop集群和Kafka集群都是高安全集群,需配置跨域互信,详情请参见跨域互信,其它配置请参见跨域互信使用Flume。
同步Kafka数据至HDFS
同步Kafka数据至Hive
同步Kafka数据至HBase
同步Kafka数据至OSS
Kerberos Kafka source
消费高安全Kafka集群的数据时,需要完成额外的配置:
- 在Kafka集群配置Kerberos认证,将生成的keytab文件test.keytab拷贝至Hadoop集群的/etc/ecm/flume-conf路径下,详情请参见兼容MIT Kerberos认证;将Kafka集群的/etc/ecm/has-conf/krb5.conf文件拷贝至Hadoop集群的/etc/ecm/flume-conf路径下。
- 配置flume.properties。
在flume.properties中添加如下配置。
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
- 配置Kafka client。
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/flume-conf/test.keytab" serviceName="kafka" principal="test@EMR.${realm}.COM"; };
${realm} 需要替换为Kafka集群的Kerberos realm。
${realm}获取方式:在Kafka集群执行命令
hostname
,得到形式为emr-header-1.cluster-xxx
的主机名,例如emr-header-1.cluster-123456
,其中数字串123456即为realm。 - 修改/etc/ecm/flume-conf/flume-env.sh。
初始情况下,/etc/ecm/flume-conf/下没有flume-env.sh 文件,需要拷贝flume-env.sh.template并重命名为flume-env.sh。添加如下内容。
export JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/etc/ecm/flume-conf/krb5.conf" export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf。
- 设置域名。
将Kafka集群各节点的长域名和IP的绑定信息添加到Hadoop集群的/etc/hosts。长域名的形式例如emr-header-1.cluster-xxxx。说明 图中标注①表示的是Hadoop集群的域名;图中标注②表示新增加的Kafka集群域名。
跨域互信使用Flume
在配置了跨域互信后,其他配置如下:
- 在Kafka集群配置Kerberos认证,将生成的keytab文件test.keytab拷贝至Hadoop集群的/etc/ecm/flume-conf路径下,详情请参见兼容MIT Kerberos认证。
- 配置flume.properties。
在flume.properties中添加如下配置。
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
- 配置Kafka client。
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf,内容如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/flume-conf/test.keytab" serviceName="kafka" principal="test@EMR.${realm}.COM"; };
${realm}替换为Kafka集群的Kerberos realm。
${realm}获取方式:在Kafka集群执行命令
hostname
,得到形式为emr-header-1.cluster-xxx
的主机名,例如emr-header-1.cluster-123456
,其中数字串123456即为realm。 - 修改/etc/ecm/flume-conf/flume-env.sh。
初始情况下,/etc/ecm/flume-conf/下没有flume-env.sh文件,需要拷贝flume-env.sh.template并重命名为flume-env.sh。添加如下内容。
export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf""
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf,内容如下。
在文档使用中是否遇到以下问题
更多建议
匿名提交