全部产品
E-MapReduce

Spark + HBase

更新时间:2017-06-07 13:26:11   分享:   

Spark + Hbase

Spark 接入 Hbase

下面这个例子演示了 Spark 如何向 Hbase 写数据。需要指出的是,计算集群需要和 Hbase 集群处于一个安全组内,否则网络无法打通。在 E-Mapreduce 创建集群时,请注意选择 Hbase 集群所处的安全组。

  1. object ConnectionUtil extends Serializable {
  2. private val conf = HBaseConfiguration.create()
  3. conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
  4. conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
  5. private val connection = ConnectionFactory.createConnection(conf)
  6. def getDefaultConn: Connection = connection
  7. }
  8. //创建数据流 unionStreams
  9. unionStreams.foreachRDD(rdd => {
  10. rdd.map(bytes => new String(bytes))
  11. .flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _)
  14. .mapPartitions {words => {
  15. val conn = ConnectionUtil.getDefaultConn
  16. val tableName = TableName.valueOf(tname)
  17. val t = conn.getTable(tableName)
  18. try {
  19. words.sliding(100, 100).foreach(slice => {
  20. val puts = slice.map(word => {
  21. println(s"word: $word")
  22. val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
  23. put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
  24. System.currentTimeMillis(), Bytes.toBytes(word._2))
  25. put
  26. }).toList
  27. t.put(puts)
  28. })
  29. } finally {
  30. t.close()
  31. }
  32. Iterator.empty
  33. }}.count()
  34. })
  35. ssc.start()
  36. ssc.awaitTermination()

附录

完整示例代码请看:

本文导读目录
本文导读目录
以上内容是否对您有帮助?