本文介绍如何通过Spark Streaming消费轻量消息队列(原 MNS) SMQ(Simple Message Queue (formerly MNS))中的数据,并统计每个Batch内的单词个数。
Spark接入SMQ
示例代码如下。
val conf = new SparkConf().setAppName("Test MNS Streaming")
val batchInterval = Seconds(10)
val ssc = new StreamingContext(conf, batchInterval)
val queuename = "queuename"
val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
val endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
StorageLevel.MEMORY_ONLY)
mnsStream.foreachRDD( rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
说明
运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量。
支持MetaService
上面的示例是显式地将AccessKey传入到轻量消息队列(原 MNS)中。从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AccessKey处理轻量消息队列(原 MNS)数据。具体可以参见E-MapReduce SDK中的MnsUtils类说明。
MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)
附录
完整示例代码,请参见SparkMNSDemo。
文档内容是否对您有帮助?