Spark on MaxCompute如何访问HBase

本文为您介绍Spark on MaxCompute访问云数据库HBase的配置方法。

背景信息

Spark on MaxCompute可以访问位于阿里云VPC内的实例(ECS、HBase、RDS等)。MaxCompute底层网络和外网默认是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.eni.info=regionid:vpc id来访问阿里云的VPC网络环境的HBase。HBase标准版和增强版(Lindorm)的配置不同,详情如下。

前提条件

在实践之前,您需要提前做好以下准备工作:

Spark on MaxCompute访问阿里云HBase标准版

  1. HBase客户端,执行如下语句创建HBase表。

    create 'test','cf'
    说明

    更多HBase使用命令,请参见HBase Shell使用介绍

  2. IDEA编译工具编写Spark代码逻辑并打包。

    1. 使用Scala编程语言,按如下代码示例编写Spark代码逻辑。

      object App {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("HbaseTest")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
            .getOrCreate()
      
          val sc = spark.sparkContext
          val config = HBaseConfiguration.create()
          //HBase集群的ZooKeeper连接地址。
          val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
          config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
          val jobConf = new JobConf(config)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          //HBase表名。
          jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
      
          try{
            import spark._
            //将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。
            spark.sql("select '7', 88 ").rdd.map(row => {
              val name= row(0).asInstanceOf[String]
              val id = row(1).asInstanceOf[Integer]
              val put = new Put(Bytes.toBytes(id))
              put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
              (new ImmutableBytesWritable, put)
            }).saveAsHadoopDataset(jobConf)
          } finally {
            sc.stop()
          }
        }
      }
      说明

      您可以通过登录HBase控制台,在HBase集群实例详情页的数据库连接页面获取ZooKeeper的连接地址。

      对应的HBase依赖文件如下。

      <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.2</version>
          </dependency>
           <dependency>
            <groupId>com.aliyun.hbase</groupId>
            <artifactId>alihbase-client</artifactId>
            <version>2.0.5</version>
          </dependency>
    2. IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源

      说明

      由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。

  3. DataWorks上创建ODPS Spark节点并配置。

    1. DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建并使用MaxCompute资源

    2. 新建ODPS Spark,并设置任务参数。详情请参见开发ODPS Spark任务

      提交Spark任务的配置参数如下。

      spark.hadoop.odps.cupid.eni.enable = true
      spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****

Spark on MaxCompute访问阿里云HBase增强版(Lindorm)

  1. HBase客户端,执行如下语句创建HBase表。

    create 'test','cf'
    说明

    更多HBase使用命令,请参见HBase Shell使用介绍

  2. IDEA编译工具编写Spark代码逻辑并打包。

    1. 使用Scala编程语言,按照如下示例编写Spark代码逻辑。

      object McToHbase {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("spark_sql_ddl")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
            .getOrCreate()
      
            val sc = spark.sparkContext
      
          try{
            //将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。
            spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
              val config = HBaseConfiguration.create()
              //ZooKeeper集群的连接地址(VPC内网地址)
              config.set("hbase.zookeeper.quorum", "<ZooKeeper连接地址>:30020");
              import spark._
              //HBase用户名和密码
              config.set("hbase.client.username", "<用户名>");
              config.set("hbase.client.password", "<密码>");
              //HBase表名
              val tableName = TableName.valueOf( "test")
              val conn = ConnectionFactory.createConnection(config)
              val table = conn.getTable(tableName);
              val puts = new util.ArrayList[Put]()
              iter.foreach(
                row => {
                  val id = row(0).asInstanceOf[String]
                  val name = row(1).asInstanceOf[String]
                  val put = new Put(Bytes.toBytes(id))
                  put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
                  puts.add(put)
                  table.put(puts)
                }
              )
            }
        } finally {
          sc.stop()
        }
        }
      }
      说明

      您可以通过登录HBase控制台,在HBase集群实例详情页的数据库连接页面获取ZooKeeper的连接地址以及HBase用户名和密码。

      对应的HBase依赖文件如下。

      <dependency>
            <groupId>com.aliyun.hbase</groupId>
            <artifactId>alihbase-client</artifactId>
            <version>2.0.8</version>
          </dependency>
    2. IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源

      说明

      由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。

  3. DataWorks上创建ODPS Spark节点并配置。

    1. DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建并使用MaxCompute资源

    2. 新建ODPS Spark,并设置任务参数。详情请参见开发ODPS Spark任务

      提交Spark任务的配置参数如下。

      spark.hadoop.odps.cupid.eni.enable = true
      spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****