基于HBase官方提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接HBase。本文为您介绍在EMR Serverless Spark环境中实现HBase的数据读取和写入操作。
前提条件
使用限制
仅Serverless Spark以下引擎版本支持本文操作:
esr-4.x:esr-4.1.0及之后版本。
esr-3.x:esr-3.1.0及之后版本。
esr-2.x:esr-2.5.0及之后版本。
操作流程
步骤一:获取HBase Spark Connector JAR并上传至OSS
根据HBase Spark Connector官方文档,结合Spark、Scala、Hadoop以及HBase的版本兼容性要求,完成以下步骤以获取所需的依赖包:
编译与打包。
根据目标环境的版本信息(包括Spark、Scala、Hadoop和HBase的版本),您可以对HBase Spark Connector进行编译,生成以下两个核心JAR包:
hbase-spark-1.1.0-SNAPSHOT.jar
hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar
例如,使用Maven命令,基于以下版本信息进行编译和打包。
mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests
如果您的环境版本与上述版本一致(即Spark 3.4.2、Scala 2.12.10、Hadoop 3.2.0、HBase 2.4.9),可以直接使用已编译好的JAR包:
获取HBase相关依赖。 从HBase安装目录的
lib/shaded-clients
以及lib/client-facing-thirdparty
文件夹提取以下依赖包,其中2.4.9为HBase版本号。hbase-shaded-client-2.4.9.jar
hbase-shaded-mapreduce-2.4.9.jar
slf4j-log4j12-1.7.30.jar
将上述五个JAR上传至阿里云OSS中,上传操作可以参见简单上传。
步骤二:创建网络连接
Serverless Spark需要能够打通与HBase之间的网络才可以正常访问HBase服务。更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通。
配置安全组规则时,端口范围请根据实际需求选择性开放必要的端口。端口范围的取值为1-65535。本文示例需开启ZooKeeper服务端口(2181)、HBase Master的端口(16000)以及HBase RegionServer的端口(16020)。
步骤三:在EMR HBase集群中创建表
通过SSH方式连接集群,详情请参见登录集群。
执行以下命令,连接HBase。
hbase shell
执行以下命令,创建测试表。
create 'hbase_table', 'c1', 'c2'
执行以下命令,写入测试数据。
put 'hbase_table', 'r1', 'c1:name', 'Alice' put 'hbase_table', 'r1', 'c1:age', '25' put 'hbase_table', 'r1', 'c2:city', 'New York' put 'hbase_table', 'r2', 'c1:name', 'Bob' put 'hbase_table', 'r2', 'c1:age', '30' put 'hbase_table', 'r2', 'c2:city', 'San Francisco'
步骤四:Serverless Spark读取HBase表
创建Notebook会话,详情请参见管理Notebook会话。
创建会话时,在引擎版本下拉列表中选择与HBase Spark Connector版本对应的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在Spark配置中添加以下参数来加载HBase Spark Connector。
spark.jars oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar spark.hadoop.hbase.zookeeper.quorum Zookeeper内网IP地址 spark.hadoop.hbase.zookeeper.property.clientPort Zookeeper服务端口
其中,涉及参数信息说明如下。
参数
描述
示例
参数
描述
示例
spark.jars
外部依赖JAR包的路径。
上传至OSS的五个文件,例如,
oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar
。spark.hadoop.hbase.zookeeper.quorum
Zookeeper内网IP地址。
如果您使用的是其他HBase集群,请根据实际情况填写相应的配置。
如果您使用的是阿里云的EMR HBase集群,则可以在EMR HBase集群的节点管理页面,查看Master节点的内网IP。
spark.hadoop.hbase.zookeeper.property.clientPort
Zookeeper服务端口。
如果您使用的是其他HBase集群,请根据实际情况填写相应的配置。
如果您使用的是阿里云的EMR HBase集群,则端口为
2181
。
在数据开发页面,选择创建一个
类型的任务,然后在右上角选择创建的Notebook会话。更多操作,请参见管理Notebook会话。
拷贝如下代码到新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行。
# 读取HBase表。 df = spark.read.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \ .option("hbase.table", "hbase_table") \ .option("hbase.spark.pushdown.columnfilter", False) \ .load() # 注册临时视图。 df.createOrReplaceTempView("hbase_table_view") # 使用SQL查询数据。 results = spark.sql("SELECT * FROM hbase_table_view") results.show()
如果能够正常返回数据,则表明配置正确。
步骤五:Serverless Spark写入HBase表
拷贝如下代码到前一个步骤中新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行。
from pyspark.sql.types import StructType, StructField, StringType
data = [
("r3", "sam", "26", "New York")
]
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", StringType(), True),
StructField("city", StringType(), True)
])
testDS = spark.createDataFrame(data=data,schema=schema)
testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()
数据写入完成后,您可以查表确认数据是否成功写入。
- 本页导读
- 前提条件
- 使用限制
- 操作流程
- 步骤一:获取HBase Spark Connector JAR并上传至OSS
- 步骤二:创建网络连接
- 步骤三:在EMR HBase集群中创建表
- 步骤四:Serverless Spark读取HBase表
- 步骤五:Serverless Spark写入HBase表