云数据库HBase是面向大数据领域的一站式NoSQL服务,适用于GB至PB级的大规模吞吐、检索、分析工作负载,是为淘宝推荐、支付宝账单、花呗风控等众多阿里巴巴核心服务提供支撑的数据库。本文主要介绍如何通过DLA
Serverless Spark 对接云数据库HBase标准版2.0版本的HBase SQL服务Phoenix。
前提条件
- 已开通HBase SQL服务。具体操作请参见HBase SQL(Phoenix) 5.x 使用说明。
- 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务。
- 准备DLA Spark访问HBase集群SQL服务所需的安全组ID和交换机ID。具体操作请参见配置数据源网络。
- 前往HBase管理控制台,把要访问的HBase集群VPC网段加入到访问控制白名单中。具体操作请参见设置白名单和安全组。
- 在HBase SQL服务中已创建表并插入数据。假设本文档创建的表名为us_population。参考命令样例如下:
#建表语句:
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
#插入数据语句:
UPSERT INTO us_population VALUES('NY','New York',8143197);
UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
UPSERT INTO us_population VALUES('IL','Chicago',2842518);
UPSERT INTO us_population VALUES('TX','Houston',2016582);
UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
UPSERT INTO us_population VALUES('CA','San Diego',1255540);
UPSERT INTO us_population VALUES('TX','Dallas',1213825);
UPSERT INTO us_population VALUES('CA','San Jose',912332);
操作步骤
- 准备以下测试代码和依赖包来访问HBase SQL,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark
import org.apache.spark.sql.SparkSession
object SparkOnHBase2xForPhoenix {
def main(args: Array[String]): Unit = {
//queryServerAddress为HBase集群SQL服务访问地址,格式为:http://xxx:8765
val queryServerAddress = args(0)
//Phoenix侧的表名,需要在Phoenix侧提前创建。
val phoenixTableName = args(1)
//Spark侧的表名。
val sparkTableName = args(2)
val sparkSession = SparkSession
.builder()
.appName("scala spark on Phoenix5.x test")
.getOrCreate()
//如果存在的话就删除表
sparkSession.sql(s"drop table if exists $sparkTableName")
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
val createCmd = "CREATE TABLE " +
sparkTableName +
" USING org.apache.spark.sql.jdbc\n" +
"OPTIONS (\n" +
" 'driver' '" + driver + "',\n" +
" 'url' '" + url + "',\n" +
" 'dbtable' '" + phoenixTableName + "',\n" +
" 'fetchsize' '" + 100 + "'\n" +
")"
println(" createCmd: \n" + createCmd)
sparkSession.sql(createCmd)
val querySql = "select * from " + sparkTableName + " limit 1"
sparkSession.sql(querySql).show
sparkSession.stop()
}
}
HBase SQL依赖的pom文件:
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<version>5.2.2-HBase-2.x-SNAPSHOT</version>
</dependency>
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择HBase集群HBase SQL服务所在的地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{
"args": [
"http://hb-xxx-proxy-phoenix.hbase.rds.aliyuncs.com:8765", #HBase SQL服务的客户端访问地址,可以是“负载均衡连接”或者“单点 QueryServer连”。
"us_population", #Phoenix的表名。
"spark_on_hbase2x_phoenix" #Spark中创建映射Phoenix表的表名。
],
"file": "oss://spark_test/jars/hbase2x/spark-examples-0.0.1-SNAPSHOT.jar", #测试代码的OSS路径。
"name": "hbase2x-for-phoenix-test",
"jars": [
"oss://spark_test/jars/hbase2x/ali-phoenix-shaded-thin-client-5.2.2-HBase-2.x-SNAPSHOT.jar" #测试代码依赖包的OSS路径。
],
"className": "com.aliyun.spark.SparkOnHBase2xForPhoenix",
"conf": {
"spark.driver.resourceSpec": "small", #表示driver的规格,有small、medium、large、xlarge之分。
"spark.executor.instances": 2, #表示executor的个数。
"spark.executor.resourceSpec": "small", #表示executor的规格,有small、medium、large、xlarge之分。
"spark.dla.eni.enable": "true", #开启访问用户VPC网络的权限。当您需要访问用户VPC网络内的数据时,需要开启此选项。
"spark.dla.eni.vswitch.id": "vsw-xxx", #可访问HBase 的交换机id。
"spark.dla.eni.security.group.id": "sg-xxx" #可访问HBase的安全组id。
}
}
执行结果
作业运行成功后,在任务列表中单击,查看作业日志。出现如下日志说明作业运行成功: