本章节介绍 Spark Streaming 如何消费 MNS 中的数据并统计每个 batch 内的单词个数。

Spark 接入 MNS

示例代码如下:
val conf = new SparkConf().setAppName("Test MNS Streaming")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)
    val queuename = "queuename"
    val accessKeyId = "<accessKeyId>"
    val accessKeySecret = "<accessKeySecret>"
    val endpoint = "http://xxx.yyy.zzzz/abc"
    val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
      StorageLevel.MEMORY_ONLY)
    mnsStream.foreachRDD( rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })
    ssc.start()
    ssc.awaitTermination()

支持 MetaService

上面的例子中,我们都是显式地将 AK 传入到接口中。从 E-MapReduce SDK 1.3.2 版本开始,Spark Streaming可以基于 MetaService 实现免 AK 处理 MNS 数据。具体可以参见 E-MapReduce SDK 中的 MnsUtils 类说明:
MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)

附录

完整示例代码请参见: