本文介绍Spark如何访问EMR HBase集群数据。

Spark访问HBase示例

重要 计算集群需要和HBase集群处于一个安全组内,否则网络无法打通。在E-Mapreduce控制台创建计算集群时,请选择HBase集群所在的安全组。
  • Java代码
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    
    try {
      List<byte[]> list = new ArrayList<>();
      list.add(Bytes.toBytes("1"));
      ...
      list.add(Bytes.toBytes("5"));
    
      JavaRDD<byte[]> rdd = jsc.parallelize(list);
      Configuration conf = HBaseConfiguration.create();
    
      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
    
      hbaseContext.foreachPartition(rdd,
          new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
       public void call(Tuple2<Iterator<byte[]>, Connection> t)
            throws Exception {
        Table table = t._2().getTable(TableName.valueOf(tableName));
        BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
        while (t._1().hasNext()) {
          byte[] b = t._1().next();
          Result r = table.get(new Get(b));
          if (r.getExists()) {
           mutator.mutate(new Put(b));
          }
        }
    
        mutator.flush();
        mutator.close();
        table.close();
       }
      });
    } finally {
      jsc.stop();
    }
  • Scala代码
    val sc = new SparkContext("local", "test")
    val config = new HBaseConfiguration()
    
    ...
    
    val hbaseContext = new HBaseContext(sc, config)
    
    rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
      val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
      it.foreach((putRecord) => {
       . val put = new Put(putRecord._1)
       . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
       . bufferedMutator.mutate(put)
     })
      bufferedMutator.flush()
      bufferedMutator.close()
    })

相关文档