阿里云首页 云原生多模数据库 Lindorm

通过Spark访问Lindorm

本文主要介绍如何通过Spark访问Lindorm宽表。

访问准备

Lindorm宽表支持从Spark访问,您需要添加alihbase-connector的依赖,具体步骤如下:

  1. 确认Spark环境中依赖的开源HBase客户端版本以及安装目录,可以通过yarn logs -applicationId xxx查看具体执行日志中加载的版本和路径,hbase-client-xxx.jar。

  2. 确认添加alihbase-connector插件的版本,版本对应表请参见历史版本适配(Maven方式)

  3. 下载对应版本的alihbase-connector,下载地址请参见历史版本适配(Jar包替换)

  4. 将alihbase-connector移动至开源HBase客户端目录。

添加Lindorm访问配置

  • 方式一:通过配置文件添加访问配置。

    在配置文件hbase-site.xml中增加下列配置项:

    <configuration>
          <!--
        集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)。
        -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
        </property>
        <!--    
       设置用户名密码,默认root:root,可根据实际情况调整。
        -->
        <property>
            <name>hbase.client.username</name>
            <value>root</value>
        </property>
        <property>
            <name>hbase.client.password</name>
            <value>root</value>
        </property>
        <!--
        如果您直接依赖了阿里云HBase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数。
        -->
        <!--property>
            <name>hbase.client.connection.impl</name>
           <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
        </property-->
    </configuration>

  • 方式二:通过代码在Configuration中添加参数。

    // 新建一个Configuration
    Configuration conf = HBaseConfiguration.create();
    // 集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)。
    conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
    // 设置用户名密码,默认root:root,可根据实际情况调整。
    conf.set("hbase.client.username", "root")
    conf.set("hbase.client.password", "root")
    // 如果您直接依赖了阿里云HBase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数。
    //conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());

Spark访问示例

添加HBaseue访问配置。
test(" test the spark sql count result") {
//添加HBaseue访问配置
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
conf.set("hbase.client.username", "test_user")  
conf.set("hbase.client.password", "password")
//创建表
val hbaseTableName = "testTable"
val cf = "f"  
           val column1 = cf + ":a"  
val column2 = cf + ":b"
var rowsCount: Int = -1
var namespace = "spark_test"  
val admin = ConnectionFactory.createConnection(conf).getAdmin() 
val tableName = TableName.valueOf(namespace, hbaseTableName) 
val htd = new HTableDescriptor(tableName)  
htd.addFamily(new HColumnDescriptor(cf)) 
admin.createTable(htd)  
//插入测试数据  
val rng = new Random()  
val k: Array[Byte] = new Array[Byte](3)  
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column)) 
val puts = new util.ArrayList[Put]() 
var i = 0  
for (b1 <- ('a' to 'z')) { 
    for (b2 <- ('a' to 'z')) {  
      for (b3 <- ('a' to 'z')) {       
          if(i < 10) {           
             k(0) = b1.toByte          
             k(1) = b2.toByte          
             k(2) = b3.toByte       
             val put = new Put(k)        
             put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())        
             puts.add(put)         
             i = i + 1       
           }       
        }     
       }  
 }  
 val conn = ConnectionFactory.createConnection(conf) 
 val table = conn.getTable(tableName) 
 table.put(puts) 


 //创建spark表 
 val sparkTableName = "spark_hbase" 
 val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark   
                        |    OPTIONS ('catalog'=               
                        |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",            
                        |    "columns":{               
                        |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},               
                        |    "col1":{"cf":"cf1", "col":"a", "type":"string"},                  
                        |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'            
                        |    )""".stripMargin  
 
 println(" createCmd: \n" + createCmd + " rows : " + rowsCount) 
 sparkSession.sql(createCmd) 

 //执行count sql 
 val result = sparkSession.sql("select count(*) from " + sparkTableName) 
 val sparkCounts = result.collect().apply(0).getLong(0) 
 println(" sparkCounts : " + sparkCounts)