本文为您介绍Spark on MaxCompute访问云数据库HBase的配置方法。

背景信息

Spark on MaxCompute可以访问位于阿里云VPC内的实例(ECS、HBase、RDS等)。MaxCompute底层网络和外网默认是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.vpc.domain.list来访问阿里云的VPC网络环境的HBase。HBase标准版和增强版(Lindorm)的配置不同,详情如下。

前提条件

在实践之前,您需要提前做好以下准备工作:
  • 已开通MaxCompute服务并创建MaxCompute项目。详情请参见开通MaxCompute服务创建MaxCompute项目
  • 已开通DataWorks服务。详情请参见DataWorks购买指导
  • 已开通HBase服务,详情请参见HBase购买指导
  • 已开通专有网络VPC,并配置了HBase集群安全组和白名单。详情请参见专有网络连接方案
    说明
    • HBase标准版安全组开放端口为2181、10600、16020,对应MaxCompute IP的白名单为100.104.0.0/16
    • HBase增强版(Lindorm)版安全组开放端口为30020、10600、16020,对应MaxCompute IP的白名单为100.104.0.0/16

Spark on MaxCompute访问阿里云HBase标准版

  1. 在HBase客户端,执行如下语句创建HBase表。
    create 'test','cf'
    说明 更多HBase使用命令,请参见HBase Shell使用介绍
  2. 在IDEA编译工具编写Spark代码逻辑并打包。
    1. 使用Scala编程语言,按如下代码示例编写Spark代码逻辑。
      object App {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("HbaseTest")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
            .getOrCreate()
      
          val sc = spark.sparkContext
          val config = HBaseConfiguration.create()
          //HBase集群的ZooKeeper连接地址。
          val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
          config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
          val jobConf = new JobConf(config)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          //HBase表名。
          jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
      
          try{
            import spark._
            //将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。
            spark.sql("select '7', 88 ").rdd.map(row => {
              val name= row(0).asInstanceOf[String]
              val id = row(1).asInstanceOf[Integer]
              val put = new Put(Bytes.toBytes(id))
              put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
              (new ImmutableBytesWritable, put)
            }).saveAsHadoopDataset(jobConf)
          } finally {
            sc.stop()
          }
        }
      }
      说明 您可以通过登录HBase控制台,在HBase集群实例详情页的数据库连接页面获取ZooKeeper的连接地址。
      对应的HBase依赖文件如下。
      <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.2</version>
          </dependency>
           <dependency>
            <groupId>com.aliyun.hbase</groupId>
            <artifactId>alihbase-client</artifactId>
            <version>2.0.5</version>
          </dependency>
    2. 在IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源
      说明 由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。
  3. 在DataWorks上创建ODPS Spark节点并配置。
    1. 在DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建JAR资源
    2. 新建ODPS Spark,并设置任务参数。详情请参见创建ODPS Spark节点
      提交Spark任务的配置参数如下图所示。节点配置
      对应的spark.hadoop.odps.cupid.vpc.domain.list参数如下所示,请您根据个人HBase集群节点进行配置。
      {
        "regionId":"cn-beijing",
        "vpcs":[
          {
            "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
            "zones":[
              {
                "urls":[
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":2181
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":2181
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":2181
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  }
                ]
              }
            ]
          }
        ]
      }

Spark on MaxCompute访问阿里云HBase增强版(Lindorm)

  1. 在HBase客户端,执行如下语句创建HBase表。
    create 'test','cf'
    说明 更多HBase使用命令,请参见HBase Shell使用介绍
  2. 在IDEA编译工具编写Spark代码逻辑并打包。
    1. 使用Scala编程语言,按照如下示例编写Spark代码逻辑。
      object McToHbase {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("spark_sql_ddl")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
            .getOrCreate()
      
            val sc = spark.sparkContext
      
          try{
            //将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。
            spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
              val config = HBaseConfiguration.create()
              //ZooKeeper集群的连接地址(VPC内网地址)
              config.set("hbase.zookeeper.quorum", "<ZooKeeper连接地址>:30020");
              import spark._
              //HBase用户名和密码
              config.set("hbase.client.username", "<用户名>");
              config.set("hbase.client.password", "<密码>");
              //HBase表名
              val tableName = TableName.valueOf( "test")
              val conn = ConnectionFactory.createConnection(config)
              val table = conn.getTable(tableName);
              val puts = new util.ArrayList[Put]()
              iter.foreach(
                row => {
                  val id = row(0).asInstanceOf[String]
                  val name = row(1).asInstanceOf[String]
                  val put = new Put(Bytes.toBytes(id))
                  put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
                  puts.add(put)
                  table.put(puts)
                }
              )
            }
        } finally {
          sc.stop()
        }
        }
      }
      说明 您可以通过登录HBase控制台,在HBase集群实例详情页的数据库连接页面获取ZooKeeper的连接地址以及HBase用户名和密码。
      对应的HBase依赖文件如下。
      <dependency>
            <groupId>com.aliyun.hbase</groupId>
            <artifactId>alihbase-client</artifactId>
            <version>2.0.8</version>
          </dependency>
    2. 在IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源
      说明 由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。
  3. 在DataWorks上创建ODPS Spark节点并配置。
    1. 在DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建JAR资源
    2. 新建ODPS Spark,并设置任务参数。详情请参见创建ODPS Spark节点
      提交Spark任务的配置参数如下图所示。节点配置
      对应的spark.hadoop.odps.cupid.vpc.domain.list参数如下所示,请您根据个人HBase集群节点进行配置。
      {
        "regionId":"cn-beijing",
        "vpcs":[
          {
            "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
            "zones":[
              {
                "urls":[
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":30020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":30020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":30020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                   {"domain":"172.*.*.10","port":16000}
                ]
              }
            ]
          }
        ]
      }
      说明 172.*.*.10为HBase增强版Java API访问地址。必须采用IP的形式,您可以在所属服务器使用ping命令获取。