本文为您介绍Spark on MaxCompute访问云数据库HBase的配置方法。
背景信息
Spark on MaxCompute可以访问位于阿里云VPC内的实例(ECS、HBase、RDS等)。MaxCompute底层网络和外网默认是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.eni.info=regionid:vpc id
来访问阿里云的VPC网络环境的HBase。HBase标准版和增强版(Lindorm)的配置不同,详情如下。
前提条件
在实践之前,您需要提前做好以下准备工作:
已开通MaxCompute服务并创建MaxCompute项目。详情请参见开通MaxCompute服务和创建MaxCompute项目。
已开通DataWorks服务。详情请参见DataWorks购买指导。
已开通HBase服务,详情请参见HBase购买指导。
已开通专有网络VPC,并配置了HBase集群安全组和白名单。详情请参见网络开通流程。
说明HBase标准版安全组开放端口为2181、10600、16020。
HBase增强版(Lindorm)版安全组开放端口为30020、10600、16020。
Spark on MaxCompute访问阿里云HBase标准版
在HBase客户端,执行如下语句创建HBase表。
create 'test','cf'
说明更多HBase使用命令,请参见HBase Shell使用介绍。
在IDEA编译工具编写Spark代码逻辑并打包。
使用Scala编程语言,按如下代码示例编写Spark代码逻辑。
object App { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("HbaseTest") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext val config = HBaseConfiguration.create() //HBase集群的ZooKeeper连接地址。 val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181" config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress); val jobConf = new JobConf(config) jobConf.setOutputFormat(classOf[TableOutputFormat]) //HBase表名。 jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test") try{ import spark._ //将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。 spark.sql("select '7', 88 ").rdd.map(row => { val name= row(0).asInstanceOf[String] val id = row(1).asInstanceOf[Integer] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } finally { sc.stop() } } }
说明您可以通过登录HBase控制台,在HBase集群实例详情页的数据库连接页面获取ZooKeeper的连接地址。
对应的HBase依赖文件如下。
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.aliyun.hbase</groupId> <artifactId>alihbase-client</artifactId> <version>2.0.5</version> </dependency>
在IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源。
说明由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。
在DataWorks上创建ODPS Spark节点并配置。
在DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建并使用MaxCompute资源。
新建ODPS Spark,并设置任务参数。详情请参见开发ODPS Spark任务。
提交Spark任务的配置参数如下。
spark.hadoop.odps.cupid.eni.enable = true spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****
Spark on MaxCompute访问阿里云HBase增强版(Lindorm)
在HBase客户端,执行如下语句创建HBase表。
create 'test','cf'
说明更多HBase使用命令,请参见HBase Shell使用介绍。
在IDEA编译工具编写Spark代码逻辑并打包。
使用Scala编程语言,按照如下示例编写Spark代码逻辑。
object McToHbase { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("spark_sql_ddl") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext try{ //将MaxCompute表中数据写入HBase表。以下查询MaxCompute表语句以常量为例,实际开发环境需替换。 spark.sql("select '7', 'long'").rdd.foreachPartition { iter => val config = HBaseConfiguration.create() //ZooKeeper集群的连接地址(VPC内网地址) config.set("hbase.zookeeper.quorum", "<ZooKeeper连接地址>:30020"); import spark._ //HBase用户名和密码 config.set("hbase.client.username", "<用户名>"); config.set("hbase.client.password", "<密码>"); //HBase表名 val tableName = TableName.valueOf( "test") val conn = ConnectionFactory.createConnection(config) val table = conn.getTable(tableName); val puts = new util.ArrayList[Put]() iter.foreach( row => { val id = row(0).asInstanceOf[String] val name = row(1).asInstanceOf[String] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name)) puts.add(put) table.put(puts) } ) } } finally { sc.stop() } } }
说明您可以通过登录HBase控制台,在HBase集群实例详情页的数据库连接页面获取ZooKeeper的连接地址以及HBase用户名和密码。
对应的HBase依赖文件如下。
<dependency> <groupId>com.aliyun.hbase</groupId> <artifactId>alihbase-client</artifactId> <version>2.0.8</version> </dependency>
在IDEA中将代码以及依赖文件打成JAR包,并通过MaxCompute客户端上传至MaxCompute项目环境中。详情请参见添加资源。
说明由于DatadWork界面方式上传JAR包有50 MB的限制,因此采用MaxCompute客户端上传JAR包。
在DataWorks上创建ODPS Spark节点并配置。
在DataWorks上,选择对应的MaxCompute项目环境,将上传的JAR包添加到数据开发环境中。详情请参见创建并使用MaxCompute资源。
新建ODPS Spark,并设置任务参数。详情请参见开发ODPS Spark任务。
提交Spark任务的配置参数如下。
spark.hadoop.odps.cupid.eni.enable = true spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****