Lindorm宽表引擎是面向海量半结构化、结构化数据设计的分布式存储,适用于元数据、订单、账单、画像、社交、feed流、日志等场景,兼容HBase、Phoenix(SQL)。本文主要介绍如何通过DLA
Serverless Spark访问Lindorm的宽表SQL(Phoenix)。
前提条件
- 已开通宽表SQL服务。具体操作请参见开通宽表SQL。
- 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务。
- 准备DLA Spark访问Lindorm实例宽表引擎所需的安全组ID和交换机ID。具体操作请参见配置数据源网络。
- 前往Lindorm控制台,把要访问的Lindorm实例VPC网段加入到访问控制白名单中。具体操作请参见设置白名单。
- 在宽表SQL服务中已创建表并插入数据。假设本文档创建的表名为us_population。参考命令样例如下:
#建表语句:
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
#插入数据语句:
UPSERT INTO us_population VALUES('NY','New York',8143197);
UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
UPSERT INTO us_population VALUES('IL','Chicago',2842518);
UPSERT INTO us_population VALUES('TX','Houston',2016582);
UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
UPSERT INTO us_population VALUES('CA','San Diego',1255540);
UPSERT INTO us_population VALUES('TX','Dallas',1213825);
UPSERT INTO us_population VALUES('CA','San Jose',912332);
操作步骤
- 准备以下测试代码和依赖包来访问宽表SQL,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark
import org.apache.spark.sql.SparkSession
object SparkOnLindormForPhoenix {
def main(args: Array[String]): Unit = {
//queryServerAddress为Lindorm集群宽表SQL服务访问地址,格式为:http://xxx:8765。
val queryServerAddress = args(0)
//宽表引擎的用户名和密码。
val user = args(1)
val password = args(2)
//Phoenix侧的表名,需要在Phoenix侧提前创建。
val phoenixTableName = args(3)
//Spark侧的表名。
val sparkTableName = args(4)
val sparkSession = SparkSession
.builder()
.appName("scala spark on Phoenix5.x test")
.getOrCreate()
//如果存在的话就删除表。
sparkSession.sql(s"drop table if exists $sparkTableName")
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
val createCmd = "CREATE TABLE " +
sparkTableName +
" USING org.apache.spark.sql.jdbc\n" +
"OPTIONS (\n" +
" 'driver' '" + driver + "',\n" +
" 'url' '" + url + "',\n" +
" 'user' '" + user + "',\n" +
" 'password' '" + password + "',\n" +
" 'dbtable' '" + phoenixTableName + "',\n" +
" 'fetchsize' '" + 100 + "'\n" +
")"
println(" createCmd: \n" + createCmd)
sparkSession.sql(createCmd)
val querySql = "select * from " + sparkTableName + " limit 1"
sparkSession.sql(querySql).show
sparkSession.stop()
}
}
宽表SQL依赖的pom文件:
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<version>5.2.2-HBase-2.x-SNAPSHOT</version>
</dependency>
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Lindorm宽表SQL所在实例的相同地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{
"args": [
"http://ld-xxx-proxy-phoenix.lindorm.rds.aliyuncs.com:8765", #宽表SQL的“客户端访问地址”-> “私网”。注意地址要以:“http://”开头。
"xxx1", #访问Lindorm的用户名。
"xxx2", #访问Lindorm的密码。
"us_population", #Phoenix的表名。
"spark_on_lindorm_phoenix" #Spark中创建映射Phoenix表的表名。
],
"file": "oss://spark_test/jars/lindorm/spark-examples-0.0.1-SNAPSHOT.jar", ##测试代码的OSS路径。
"name": "lindorm-for-phoenix-test",
"jars": [
"oss://spark_test/jars/lindorm/ali-phoenix-shaded-thin-client-5.2.2-HBase-2.x-SNAPSHOT.jar" ##测试代码依赖包的OSS路径。
],
"className": "com.aliyun.spark.SparkOnLindormForPhoenix",
"conf": {
"spark.driver.resourceSpec": "small", #表示driver的规格,有small、medium、large、xlarge之分。
"spark.executor.instances": 2, #表示executor的个数。
"spark.executor.resourceSpec": "small", ##表示executor的规格,有small、medium、large、xlarge之分。
"spark.dla.eni.enable": "true", #开启访问用户VPC网络的权限。当您需要访问用户VPC网络内的数据时,需要开启此选项。
"spark.dla.eni.vswitch.id": "vsw-xxx", #可访问Lindorm的交换机id。
"spark.dla.eni.security.group.id": "sg-xxx" #可访问Lindorm的安全组id。
}
}
执行结果
作业运行成功后,在任务列表中单击,查看作业日志。出现如下日志说明作业运行成功:
在文档使用中是否遇到以下问题
更多建议
匿名提交