本文介绍如何在E-MapReduce的Hadoop集群运行Spark Streaming作业,处理Kafka集群的数据。
背景信息
E-MapReduce上的Hadoop集群和Kafka集群都是基于纯开源软件,相关编程使用方法可参见官方相应文档。
- Spark官方文档:streaming-kafka-integration和structured-streaming-kafka-integration。
- E-MapReduce-demo:github地址。
访问Kerberos Kafka集群
E-MapReduce支持创建基于Kerberos认证的Kafka集群。当Hadoop集群作业需要访问Kerberos Kafka集群时,有以下两种使用方式:
- 非Kerberos Hadoop集群:提供用于Kafka集群的Kerberos认证的kafka_client_jaas.conf和krb5.conf文件。
- Kerberos Hadoop集群:基于Kerberos集群跨域互信,提供用于Hadoop集群的Kerberos认证的kafka_client_jaas.conf和krb5.conf文件。
跨域互信详细信息,请参见跨域互信。
以上两种方式都需要运行作业时提供kafka_client_jaas.conf和krb5.conf文件,用于Kerberos认证。
- kafka_client_jaas.conf文件格式如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/path/to/kafka.keytab" principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM"; };
说明 keytab文件的获取,请参见兼容MIT Kerberos认证。 - krb5.conf文件,请从Kafka集群的/etc/目录下获取。
Spark Streaming访问Kerberos Kafka集群
添加Kafka集群各个节点的长域名和IP信息至Hadoop集群各个节点的/etc/hosts中。长域名和IP信息,您可以在/etc/hosts中获取,长域名形式为emr-xxx-x.cluster-xxx
。
当运行Spark Streaming作业访问Kerberos kafka时,可以在spark-submit命令行参数中提供所需的kafka_client_jaas.conf和kafka.keytab文件。
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
kafka_client_jaas.conf文件中,keytab文件路径需要写相对路径,请严格按照如下keyTab配置项写法。
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};
spark-sql访问Kafka
示例代码如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
说明
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
中包含Kafka DataSource类型。如果您EMR集群使用的是Spark2,则应修改上面命令中的spark3
应该换成spark2
。
创建和查询示例如下。
create table test_kafka
using loghub
options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
subscribe='test_topic',
startingoffsets='earliest'
)
select * from test_kafka;
附录
- 示例代码,请参见Spark对接Kafka。
- 更多参数的配置和含义,请参见Structured Streaming + Kafka Integration Guide。