本文介绍Spark如何访问EMR HBase集群数据。
Spark访问HBase示例
重要 计算集群需要和HBase集群处于一个安全组内,否则网络无法打通。在E-Mapreduce控制台创建计算集群时,请选择HBase集群所在的安全组。
- Java代码
JavaSparkContext jsc = new JavaSparkContext(sparkConf); try { List<byte[]> list = new ArrayList<>(); list.add(Bytes.toBytes("1")); ... list.add(Bytes.toBytes("5")); JavaRDD<byte[]> rdd = jsc.parallelize(list); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.foreachPartition(rdd, new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { public void call(Tuple2<Iterator<byte[]>, Connection> t) throws Exception { Table table = t._2().getTable(TableName.valueOf(tableName)); BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); while (t._1().hasNext()) { byte[] b = t._1().next(); Result r = table.get(new Get(b)); if (r.getExists()) { mutator.mutate(new Put(b)); } } mutator.flush(); mutator.close(); table.close(); } }); } finally { jsc.stop(); }
- Scala代码
val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() ... val hbaseContext = new HBaseContext(sc, config) rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) it.foreach((putRecord) => { . val put = new Put(putRecord._1) . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) . bufferedMutator.mutate(put) }) bufferedMutator.flush() bufferedMutator.close() })
相关文档
- 关于Spark访问EMR-HBase的更多操作,请参见hbase-connectors。
- 关于HBase更多介绍,请参见HBase and Spark。