本文介绍如何在E-MapReduce的Hadoop集群运行Spark Streaming作业,处理Kafka集群的数据。

背景信息

由于E-MapReduce上的Hadoop集群和Kafka集群都是基于纯开源版本软件,所以在编程使用上参考相应官方文档即可。

访问Kerberos Kafka集群

E-MapReduce支持创建基于Kerberos认证的Kafka集群。当Hadoop集群作业需要访问Kerberos Kafka集群时,有如下两种使用方式:
  • 非Kerberos Hadoop集群:提供用于Kafka集群的Kerberos认证的kafka_client_jaas.confkrb5.conf文件。
  • Kerberos Hadoop 集群: 基于Kerberos集群跨域互信,提供用于Hadoop集群的Kerberos认证的kafka_client_jaas.confkrb5.conf文件。

以上两种方式都需要运行作业时提供kafka_client_jaas.conf文件,用于Kerberos认证。

kafka_client_jaas.conf文件格式如下。
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/path/to/kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};

Spark Streaming访问Kerberos Kafka集群

将Kafka集群各个节点的长域名和IP信息,加入Hadoop集群各个节点的/etc/hosts中。长域名和IP信息可在/etc/hosts中获取,长域名形式为emr-xxx-x.cluster-xxx

当运行Spark Streaming作业访问Kerberos kafka时,可以在spark-submit命令行参数中提供所需的kafka_client_jaas.confkafka.keytab文件。
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class  xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
kafka_client_jaas.conf文件中,keytab文件路径需要写相对路径,请严格按照如下keyTab配置项写法。
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};