访问准备

使用HBase API的用户,支持通过Spark访问Lindorm,用户只需要使用阿里云发布的HBase客户端,具体请参考HBase Java SDK 安装&升级

获取访问地址

参见连接集群

获取用户名和密码

参见连接集群

添加Lindorm访问配置

方式一:配置文件
hbase-site.xml 中增加下列配置项:
1.<configuration>
2.      <!--
3.    集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
4.    -->
5.    <property>
6.        <name>hbase.zookeeper.quorum</name>
7.        <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
8.    </property>
9.    <!--    
10.   设置用户名密码,默认root:root,可根据实际情况调整
11.    -->
12.    <property>
13.        <name>hbase.client.username</name>
14.        <value>root</value>
15.    </property>
16.    <property>
17.        <name>hbase.client.password</name>
18.        <value>root</value>
19.    </property>
20.    <!--
21.    如果您直接依赖了阿里云hbase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数
22.    -->
23.    <!--property>
24.        <name>hbase.client.connection.impl</name>
25.       <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
26.    </property-->
27.</configuration>
方式二:代码

通过代码在Configuration中添加参数:

1. // 新建一个Configuration
2. Configuration conf = HBaseConfiguration.create();
3. // 集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
4. conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
5. // 设置用户名密码,默认root:root,可根据实际情况调整
6. conf.set("hbase.client.username", "root")
7. conf.set("hbase.client.password", "root")
8.
9. // 如果您直接依赖了阿里云hbase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数
10.//conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());

Spark访问示例

1.  test(" test the spark sql count result") {
2.  //1. 添加hbase ue访问配置
3.  var conf = HBaseConfiguration.create
4.  conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
5.  conf.set("hbase.client.username", "test_user")  
6.  conf.set("hbase.client.password", "password")
7.
8.  //2. 创建表
9.  val hbaseTableName = "testTable"
10. val cf = "f"  
11.            val column1 = cf + ":a"  
12. val column2 = cf + ":b"
13. var rowsCount: Int = -1
14. var namespace = "spark_test"  
15. val admin = ConnectionFactory.createConnection(conf).getAdmin() 
16. val tableName = TableName.valueOf(namespace, hbaseTableName) 
17. val htd = new HTableDescriptor(tableName)  
18. htd.addFamily(new HColumnDescriptor(cf)) 
19. admin.createTable(htd)  
20.
21. //3. 插入测试数据  
22. val rng = new Random()  
23. val k: Array[Byte] = new Array[Byte](3)  
24. val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column)) 
25. val puts = new util.ArrayList[Put]() 
26. var i = 0  
27. for (b1 <- ('a' to 'z')) { 
28.     for (b2 <- ('a' to 'z')) {  
29.       for (b3 <- ('a' to 'z')) {       
30.          if(i < 10) {           
31.             k(0) = b1.toByte          
32.             k(1) = b2.toByte          
33.             k(2) = b3.toByte       
34.             val put = new Put(k)        
35.             put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())        
36.             puts.add(put)         
37.             i = i + 1       
38.           }       
39.        }     
40.       }  
41. }  
42. val conn = ConnectionFactory.createConnection(conf) 
43. val table = conn.getTable(tableName) 
44. table.put(puts) 
45.
46.
47. //4. 创建spark表 
48. val sparkTableName = "spark_hbase" 
49. val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark   
50.                        |    OPTIONS ('catalog'=               
51.                        |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",            
52.                        |    "columns":{               
53.                        |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},               
54.                        |    "col1":{"cf":"cf1", "col":"a", "type":"string"},                  
55.                        |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'            
56.                        |    )""".stripMargin  
57. 
58. println(" createCmd: \n" + createCmd + " rows : " + rowsCount) 
59. sparkSession.sql(createCmd) 
60.
61. //5. 执行count sql 
62. val result = sparkSession.sql("select count(*) from " + sparkTableName) 
63. val sparkCounts = result.collect().apply(0).getLong(0) 
64. println(" sparkCounts : " + sparkCounts)