全部产品
E-MapReduce

Spark + MNS

更新时间:2017-06-07 13:26:11   分享:   

Spark + MNS

Spark 接入 MNS

下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。

  1. val conf = new SparkConf().setAppName("Test MNS Streaming")
  2. val batchInterval = Seconds(10)
  3. val ssc = new StreamingContext(conf, batchInterval)
  4. val queuename = "queuename"
  5. val accessKeyId = "<accessKeyId>"
  6. val accessKeySecret = "<accessKeySecret>"
  7. val endpoint = "http://xxx.yyy.zzzz/abc"
  8. val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
  9. StorageLevel.MEMORY_ONLY)
  10. mnsStream.foreachRDD( rdd => {
  11. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  14. })
  15. ssc.start()
  16. ssc.awaitTermination()

支持MetaService

上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AK处理MNS数据。具体可以参考E-MapReduce SDK中的MnsUtils类说明:

  1. MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
  2. MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)

附录

完整示例代码请看:

本文导读目录
本文导读目录
以上内容是否对您有帮助?