全部产品
云市场

通过Spark访问HBase增强版

更新时间:2020-03-17 11:50:11

访问准备

HBase增强版支持从Spark访问,用户只需要加入阿里云发布的HBase客户端,或者alihbase-connector的依赖即可,最新版本详见JAVA SDK安装

获取访问地址

参见连接集群,使用地址中Java API访问地址,默认端口为30020,如果是公网访问,请使用公网域名

获取用户名和密码

参见连接集群,默认的用户名为root,密码为root。或者在集群管理页面中关闭ACL功能后,无需再提供用户名密码

添加HBase增强版访问配置

方式一:配置文件

hbase-site.xml 中增加下列配置项:

  1. <configuration>
  2. <!--
  3. 集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
  4. -->
  5. <property>
  6. <name>hbase.zookeeper.quorum</name>
  7. <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
  8. </property>
  9. <!--
  10. 设置用户名密码,默认root:root,可根据实际情况调整
  11. -->
  12. <property>
  13. <name>hbase.client.username</name>
  14. <value>root</value>
  15. </property>
  16. <property>
  17. <name>hbase.client.password</name>
  18. <value>root</value>
  19. </property>
  20. <!--
  21. 如果您直接依赖了阿里云hbase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数
  22. -->
  23. <!--property>
  24. <name>hbase.client.connection.impl</name>
  25. <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
  26. </property-->
  27. </configuration>

方式二:代码

通过代码在Configuration中添加参数:

  1. // 新建一个Configuration
  2. Configuration conf = HBaseConfiguration.create();
  3. // 集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
  4. conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
  5. // 设置用户名密码,默认root:root,可根据实际情况调整
  6. conf.set("hbase.client.username", "root")
  7. conf.set("hbase.client.password", "root")
  8. // 如果您直接依赖了阿里云hbase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数
  9. //conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());

Spark访问示例

  1. test(" test the spark sql count result") {
  2. //1. 添加hbase ue访问配置
  3. var conf = HBaseConfiguration.create
  4. conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
  5. conf.set("hbase.client.username", "test_user")
  6. conf.set("hbase.client.password", "password")
  7. //2. 创建表
  8. val hbaseTableName = "testTable"
  9. val cf = "f"
  10. val column1 = cf + ":a"
  11. val column2 = cf + ":b"
  12. var rowsCount: Int = -1
  13. var namespace = "spark_test"
  14. val admin = ConnectionFactory.createConnection(conf).getAdmin()
  15. val tableName = TableName.valueOf(namespace, hbaseTableName)
  16. val htd = new HTableDescriptor(tableName)
  17. htd.addFamily(new HColumnDescriptor(cf))
  18. admin.createTable(htd)
  19. //3. 插入测试数据
  20. val rng = new Random()
  21. val k: Array[Byte] = new Array[Byte](3)
  22. val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  23. val puts = new util.ArrayList[Put]()
  24. var i = 0
  25. for (b1 <- ('a' to 'z')) {
  26. for (b2 <- ('a' to 'z')) {
  27. for (b3 <- ('a' to 'z')) {
  28. if(i < 10) {
  29. k(0) = b1.toByte
  30. k(1) = b2.toByte
  31. k(2) = b3.toByte
  32. val put = new Put(k)
  33. put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
  34. puts.add(put)
  35. i = i + 1
  36. }
  37. }
  38. }
  39. }
  40. val conn = ConnectionFactory.createConnection(conf)
  41. val table = conn.getTable(tableName)
  42. table.put(puts)
  43. //4. 创建spark表
  44. val sparkTableName = "spark_hbase"
  45. val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
  46. | OPTIONS ('catalog'=
  47. | '{"table":{"namespace":"$${hbaseTableName}", "name":"${hbaseTableName}"},"rowkey":"rowkey",
  48. | "columns":{
  49. | "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
  50. | "col1":{"cf":"cf1", "col":"a", "type":"string"},
  51. | "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
  52. | )""".stripMargin
  53. println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  54. sparkSession.sql(createCmd)
  55. //5. 执行count sql
  56. val result = sparkSession.sql("select count(*) from " + sparkTableName)
  57. val sparkCounts = result.collect().apply(0).getLong(0)
  58. println(" sparkCounts : " + sparkCounts)