在EMR Serverless Spark中实现HBase读写操作

更新时间:2025-04-01 03:30:45

基于HBase官方提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接HBase。本文为您介绍在EMR Serverless Spark环境中实现HBase的数据读取和写入操作。

前提条件

  • 已创建Serverless Spark工作空间,详情请参见创建工作空间

  • 已创建HBase集群。

    本文以在EMR on ECS创建包含HBase服务的自定义集群为例,后续简称EMR HBase集群。创建集群详情请参见创建集群

使用限制

Serverless Spark以下引擎版本支持本文操作:

  • esr-4.x:esr-4.1.0及之后版本。

  • esr-3.x:esr-3.1.0及之后版本。

  • esr-2.x:esr-2.5.0及之后版本。

操作流程

步骤一:获取HBase Spark Connector JAR并上传至OSS

根据HBase Spark Connector官方文档,结合Spark、Scala、Hadoop以及HBase的版本兼容性要求,完成以下步骤以获取所需的依赖包:

  1. 编译与打包。

    根据目标环境的版本信息(包括Spark、Scala、HadoopHBase的版本),您可以对HBase Spark Connector进行编译,生成以下两个核心JAR包:

    • hbase-spark-1.1.0-SNAPSHOT.jar

    • hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar

      例如,使用Maven命令,基于以下版本信息进行编译和打包。

      mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests

      如果您的环境版本与上述版本一致(即Spark 3.4.2、Scala 2.12.10、Hadoop 3.2.0、HBase 2.4.9),可以直接使用已编译好的JAR包:

  2. 获取HBase相关依赖。 从HBase安装目录的lib/shaded-clients以及lib/client-facing-thirdparty文件夹提取以下依赖包,其中2.4.9HBase版本号。

    • hbase-shaded-client-2.4.9.jar

    • hbase-shaded-mapreduce-2.4.9.jar

    • slf4j-log4j12-1.7.30.jar

  3. 将上述五个JAR上传至阿里云OSS中,上传操作可以参见简单上传

步骤二:创建网络连接

Serverless Spark需要能够打通与HBase之间的网络才可以正常访问HBase服务。更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通

重要

配置安全组规则时,端口范围请根据实际需求选择性开放必要的端口。端口范围的取值为1-65535。本文示例需开启ZooKeeper服务端口(2181)、HBase Master的端口(16000)以及HBase RegionServer的端口(16020)。

步骤三:在EMR HBase集群中创建表

  1. 通过SSH方式连接集群,详情请参见登录集群

  2. 执行以下命令,连接HBase。

    hbase shell
  3. 执行以下命令,创建测试表。

    create 'hbase_table', 'c1', 'c2'
  4. 执行以下命令,写入测试数据。

    put 'hbase_table', 'r1', 'c1:name', 'Alice'
    put 'hbase_table', 'r1', 'c1:age', '25'
    put 'hbase_table', 'r1', 'c2:city', 'New York'
    
    put 'hbase_table', 'r2', 'c1:name', 'Bob'
    put 'hbase_table', 'r2', 'c1:age', '30'
    put 'hbase_table', 'r2', 'c2:city', 'San Francisco'

步骤四:Serverless Spark读取HBase

  1. 创建Notebook会话,详情请参见管理Notebook会话

    创建会话时,在引擎版本下拉列表中选择与HBase Spark Connector版本对应的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在Spark配置中添加以下参数来加载HBase Spark Connector。

    spark.jars                                        oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar
    spark.hadoop.hbase.zookeeper.quorum               Zookeeper内网IP地址
    spark.hadoop.hbase.zookeeper.property.clientPort  Zookeeper服务端口

    其中,涉及参数信息说明如下。

    参数

    描述

    示例

    参数

    描述

    示例

    spark.jars

    外部依赖JAR包的路径。

    上传至OSS的五个文件,例如,oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar

    spark.hadoop.hbase.zookeeper.quorum

    Zookeeper内网IP地址。

    • 如果您使用的是其他HBase集群,请根据实际情况填写相应的配置。

    • 如果您使用的是阿里云的EMR HBase集群,则可以在EMR HBase集群的节点管理页面,查看Master节点的内网IP

    spark.hadoop.hbase.zookeeper.property.clientPort

    Zookeeper服务端口。

    • 如果您使用的是其他HBase集群,请根据实际情况填写相应的配置。

    • 如果您使用的是阿里云的EMR HBase集群,则端口为 2181

  2. 数据开发页面,选择创建一个Python > Notebook类型的任务,然后在右上角选择创建的Notebook会话。

    更多操作,请参见管理Notebook会话

  3. 拷贝如下代码到新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行

    # 读取HBase表。
    df = spark.read.format("org.apache.hadoop.hbase.spark") \
        .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \
        .option("hbase.table", "hbase_table") \
        .option("hbase.spark.pushdown.columnfilter", False) \
        .load()
    
    # 注册临时视图。
    df.createOrReplaceTempView("hbase_table_view")
    
    # 使用SQL查询数据。
    results = spark.sql("SELECT * FROM hbase_table_view")
    results.show()
    

    如果能够正常返回数据,则表明配置正确。

    image

步骤五:Serverless Spark写入HBase

拷贝如下代码到前一个步骤中新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行

from pyspark.sql.types import StructType, StructField, StringType

data = [
    ("r3", "sam", "26", "New York")
]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("city", StringType(), True)
])
 
testDS = spark.createDataFrame(data=data,schema=schema)

testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()

数据写入完成后,您可以查表确认数据是否成功写入。

image

  • 本页导读
  • 前提条件
  • 使用限制
  • 操作流程
  • 步骤一:获取HBase Spark Connector JAR并上传至OSS
  • 步骤二:创建网络连接
  • 步骤三:在EMR HBase集群中创建表
  • 步骤四:Serverless Spark读取HBase表
  • 步骤五:Serverless Spark写入HBase表