本文为您介绍Spark on MaxCompute访问云数据库HBase的配置方法。
背景信息
Spark on MaxCompute可以访问位于阿里云VPC内的实例(ECS、HBase、RDS等)。MaxCompute底层网络和外网默认是隔离的,Spark
on MaxCompute提供了一种方案通过配置
spark.hadoop.odps.cupid.vpc.domain.list
来访问阿里云的VPC网络环境的HBase。HBase标准版和增强版(Lindorm)的配置不同,详情如下。
前提条件
在实践之前,您需要提前做好以下准备工作:
- 已开通MaxCompute服务并创建MaxCompute项目。详情请参见开通MaxCompute服务和创建MaxCompute项目。
- 已开通DataWorks服务。详情请参见DataWorks购买指导。
- 已开通HBase服务,详情请参见HBase购买指导。
- 已开通专有网络VPC,并配置了HBase集群安全组和白名单。详情请参见专有网络连接方案。
说明
- HBase标准版安全组开放端口为2181、10600、16020,对应MaxCompute IP的白名单为
100.104.0.0/16
。
- HBase增强版(Lindorm)版安全组开放端口为30020、10600、16020,对应MaxCompute IP的白名单为
100.104.0.0/16
。
Spark on MaxCompute访问阿里云HBase标准版
- 在HBase客户端,执行如下语句创建HBase表。
- 在IDEA编译工具编写Spark代码逻辑并打包。
- 使用Scala编程语言,按如下代码示例编写Spark代码逻辑。
object App {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("HbaseTest")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()
val sc = spark.sparkContext
val config = HBaseConfiguration.create()
//HBase集群的ZooKeeper连接地址。
val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf[TableOutputFormat])
//HBase表名。
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
try{
import spark._
//将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。
spark.sql("select '7', 88 ").rdd.map(row => {
val name= row(0).asInstanceOf[String]
val id = row(1).asInstanceOf[Integer]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
} finally {
sc.stop()
}
}
}
说明 您可以通过登录
HBase控制台,在
HBase
集群实例详情页的
数据库连接页面获取
ZooKeeper
的连接地址。
对应的
HBase
依赖文件如下。
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.5</version>
</dependency>
- 在IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源。
说明 由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。
- 在DataWorks上创建ODPS Spark节点并配置。
- 在DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建JAR资源。
- 新建ODPS Spark,并设置任务参数。详情请参见创建ODPS Spark节点。
提交
Spark
任务的配置参数如下图所示。
对应的
spark.hadoop.odps.cupid.vpc.domain.list参数如下所示,请您根据个人
HBase
集群节点进行配置。
{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":[
{
"urls":[
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
}
]
}
]
}
]
}
Spark on MaxCompute访问阿里云HBase增强版(Lindorm)
- 在HBase客户端,执行如下语句创建HBase表。
- 在IDEA编译工具编写Spark代码逻辑并打包。
- 使用Scala编程语言,按照如下示例编写Spark代码逻辑。
object McToHbase {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("spark_sql_ddl")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()
val sc = spark.sparkContext
try{
//将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。
spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
val config = HBaseConfiguration.create()
//ZooKeeper集群的连接地址(VPC内网地址)
config.set("hbase.zookeeper.quorum", "<ZooKeeper连接地址>:30020");
import spark._
//HBase用户名和密码
config.set("hbase.client.username", "<用户名>");
config.set("hbase.client.password", "<密码>");
//HBase表名
val tableName = TableName.valueOf( "test")
val conn = ConnectionFactory.createConnection(config)
val table = conn.getTable(tableName);
val puts = new util.ArrayList[Put]()
iter.foreach(
row => {
val id = row(0).asInstanceOf[String]
val name = row(1).asInstanceOf[String]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
puts.add(put)
table.put(puts)
}
)
}
} finally {
sc.stop()
}
}
}
说明 您可以通过登录
HBase控制台,在
HBase
集群实例详情页的
数据库连接页面获取
ZooKeeper
的连接地址以及
HBase
用户名和密码。
对应的
HBase
依赖文件如下。
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.8</version>
</dependency>
- 在IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源。
说明 由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。
- 在DataWorks上创建ODPS Spark节点并配置。
- 在DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建JAR资源。
- 新建ODPS Spark,并设置任务参数。详情请参见创建ODPS Spark节点。
提交
Spark
任务的配置参数如下图所示。
对应的
spark.hadoop.odps.cupid.vpc.domain.list参数如下所示,请您根据个人
HBase
集群节点进行配置。
{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":[
{
"urls":[
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{"domain":"172.*.*.10","port":16000}
]
}
]
}
]
}
说明 172.*.*.10为HBase增强版Java API访问地址。必须采用IP的形式,您可以在所属服务器使用ping
命令获取。