全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
E-MapReduce

Spark + ONS

更新时间:2017-06-07 13:26:11

Spark + ONS

Spark 接入 ONS

下面这个例子演示了 Spark Streaming 如何消费 ONS 中的数据,统计每个 batch 内的单词个数。

  1. val Array(cId, topic, subExpression, parallelism, interval) = args
  2. val accessKeyId = "<accessKeyId>"
  3. val accessKeySecret = "<accessKeySecret>"
  4. val numStreams = parallelism.toInt
  5. val batchInterval = Milliseconds(interval.toInt)
  6. val conf = new SparkConf().setAppName("Test ONS Streaming")
  7. val ssc = new StreamingContext(conf, batchInterval)
  8. def func: Message => Array[Byte] = msg => msg.getBody
  9. val onsStreams = (0 until numStreams).map { i =>
  10. println(s"starting stream $i")
  11. OnsUtils.createStream(
  12. ssc,
  13. cId,
  14. topic,
  15. subExpression,
  16. accessKeyId,
  17. accessKeySecret,
  18. StorageLevel.MEMORY_AND_DISK_2,
  19. func)
  20. }
  21. val unionStreams = ssc.union(onsStreams)
  22. unionStreams.foreachRDD(rdd => {
  23. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  24. .map(word => (word, 1))
  25. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  26. })
  27. ssc.start()
  28. ssc.awaitTermination()

附录

示例代码请看:

本文导读目录