本文介绍Spark如何向Hbase写数据。
注意
计算集群需要和Hbase集群处于一个安全组内,否则网络无法打通。在E-Mapreduce创建集群时,请注意选择Hbase集群所处的安全组。
Spark接入Hbase
代码如下。
object ConnectionUtil extends Serializable {
private val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
private val connection = ConnectionFactory.createConnection(conf)
def getDefaultConn: Connection = connection
}
//创建数据流unionStreams
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.mapPartitions {words => {
val conn = ConnectionUtil.getDefaultConn
val tableName = TableName.valueOf(tname)
val t = conn.getTable(tableName)
try {
words.sliding(100, 100).foreach(slice => {
val puts = slice.map(word => {
println(s"word: $word")
val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
System.currentTimeMillis(), Bytes.toBytes(word._2))
put
}).toList
t.put(puts)
})
} finally {
t.close()
}
Iterator.empty
}}.count()
})
ssc.start()
ssc.awaitTermination()
附录
完整示例代码请参见Spark接入Hbase。
在文档使用中是否遇到以下问题
更多建议
匿名提交