批计算

本文介绍在使用Spark计算引擎访问表格存储时,如何通过DataFrame编程方式对表格存储中的数据进行批计算,并分别在本地和集群环境中进行运行调试。

准备工作

  • 在表格存储中创建数据表,并写入数据。详情请参见宽表模型快速入门

    说明

    数据表search_view的结构及示例数据,请参见附录:示例数据表

  • 为阿里云账号或具有表格存储访问权限的RAM用户创建AccessKey

  • 搭建Java开发环境。

    本文以Windows环境、JDK 1.8IntelliJ IDEA 2024.1.2 (Community Edition)Apache Maven为例介绍。

操作步骤

步骤一:下载项目源码

通过Git下载样例项目。

git clone https://github.com/aliyun/tablestore-examples.git

如果因为网络问题无法下载,您也可以直接下载tablestore-examples-master.zip

步骤二:更新Maven依赖

  1. 进入tablestore-spark-demo根目录。

    说明

    推荐您阅读tablestore-spark-demo根目录下的README.md文档,全面了解项目信息。

  2. 执行以下命令,安装emr-tablestore-2.2.0-SNAPSHOT.jar到本地Maven仓库。

    mvn install:install-file -Dfile="libs/emr-tablestore-2.2.0-SNAPSHOT.jar" -DartifactId=emr-tablestore -DgroupId="com.aliyun.emr" -Dversion="2.2.0-SNAPSHOT" -Dpackaging=jar -DgeneratePom=true

步骤三:(可选)修改示例代码

核心代码修改说明

此处以TableStoreBatchSample为例,对该示例代码的核心部分进行如下说明。

代码块

描述

val df = sparkSession.read
  .format("tablestore")
  .option("instance.name", instanceName)
  .option("table.name", tableName)
  .option("endpoint", endpoint)
  .option("access.key.id", accessKeyId)
  .option("access.key.secret", accessKeySecret)
  .option("split.size.mbs", 100)
  // .option("catalog", dataCatalog)
  // 最新版本支持使用Schema方式替换catalog的配置。
  .schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG")
  .load()

通过SparkDataFrameReader接口,读取表格存储中的数据,并将其加载为一个DataFrame对象。

  • format("tablestore")表示使用ServiceLoader方式加载Spark Tablestore connector。具体配置,请参见META-INF.services目录。

  • instanceNametableNameendpointaccessKeyIdaccessKeySecret分别表示表格存储的实例名称、数据表名称、实例Endpoint、阿里云账号或RAM用户的AccessKey IDAccessKey Secret。

  • split.size.mbs表示每个Split的切分大小,默认值为100,单位为MB。此值越小产生的Split会越多,对应SparkTask也会越多。

  • .schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG")表示指定数据表的Schema,定义字段名称及其数据类型。示例中的数据表有salt(Long类型)、UserId(String类型)、OrderId(String类型)、price(Double类型)和timestamp(Long类型)五个字段。

    说明

    最新版本支持使用Schema方式替换catalog的配置,请根据实际情况选择。

val dataCatalog: String =
  s"""
     |{"columns": {
     |    "salt": {"type":"long"},
     |    "UserId": {"type":"string"},
     |    "OrderId": {"type":"string"},
     |    "price": {"type":"double"},
     |    "timestamp": {"type":"long"}
     | }
     |}""".stripMargin

定义了一个JSON字符串dataCatalog,用于描述表格存储中的Schema信息。每个字段的名称及其数据类型均以键值对的形式呈现。

df.filter("salt = 1 AND UserId = 'user_A'").show(20, truncate = false)

DataFrame执行过滤操作,筛选出满足条件salt = 1 AND UserId = 'user_A'的数据,并显示前20条记录。

df.createTempView("search_view")
val searchDF = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId = 'user_A'")
searchDF.show()
val searchDF2 = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId = 'user_A'" +
  " AND OrderId = '00002664-9d8b-441b-bad7-845202f3b142'")
searchDF2.show()
val searchDF3 = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId >= 'user_A' AND UserId < 'user_B'")
searchDF3.show()

DataFrame注册为临时视图search_view,并通过Spark SQL执行多个聚合查询,统计满足不同条件的记录数。

  • SQL查询1:统计符合条件salt = 1 AND UserId = 'user_A'的记录总数。

  • SQL查询2:统计符合条件salt = 1 AND UserId = 'user_A' AND OrderId = '00002664-9d8b-441b-bad7-845202f3b142'的记录总数。

  • SQL查询3:统计符合条件salt = 1 AND UserId >= 'user_A' AND UserId < 'user_B''的记录总数。

步骤四:运行调试

您可以在本地或者通过Spark集群进行运行调试。此处以TableStoreBatchSample为例介绍调试过程。

本地开发环境

以在Windows操作系统上使用IntelliJ IDEA为例,为您介绍如何调试。

  1. 安装Scala插件。

    IntelliJ IDEA默认不支持Scala,需要您手动安装Scala插件。

  2. 安装winutils.exe(本文使用winutils 3.3.6)。

    仅在Windows环境下运行Spark时,还需安装 winutils.exe以解决兼容性问题。您可以通过github项目主页进行下载。

  3. Scala程序TableStoreBatchSample上右键单击,选择Modify Run Configuration,打开Edit Run Configuration弹窗。

    说明

    由于操作系统及IntelliJ IDEA的版本不同,实际操作可能存在一些细微差异。

    1. Program arguments栏中,依次配置实例名称、数据表名称、AccessKey ID、AccessKey Secret和实例Endpoint参数。

      myinstance search_view LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com
    2. 单击Modify options,勾选Add dependencies with "provided" scope to classpath选项,并单击OK。

      2025-05-13_145645

  4. 运行Scala程序。

    运行结束后,将打印结果到控制台。

    With DataFrame
    +----+------+------------------------------------+-----+-------------+
    |salt|UserId|OrderId                             |price|timestamp    |
    +----+------+------------------------------------+-----+-------------+
    |1   |user_A|00002664-9d8b-441b-bad7-845202f3b142|29.6 |1744773183629|
    |1   |user_A|9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f|785.3|1744773190240|
    +----+------+------------------------------------+-----+-------------+
    
    With Spark SQL
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+
    
    +--------+
    |count(1)|
    +--------+
    |       1|
    +--------+
    
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+

Spark集群环境

重要

在进行调试之前,请确保已构建Spark集群,并且该集群环境中的Spark版本与样例项目的Spark版本一致。否则,可能会因版本不兼容而导致运行错误。

spark-submit方式为例说明。示例代码中的master默认为local[*],在Spark集群上运行时可以去掉,使用spark-submit参数传入。

  1. 执行命令mvn -U clean package对项目进行打包,JAR包的路径为target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

  2. JAR包上传到Spark集群的Driver节点,并使用spark-submit提交任务。

    spark-submit --class com.aliyun.tablestore.spark.demo.batch.TableStoreBatchSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar myinstance search_view LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 

    fig_batch_dataframe001

附录:示例数据表

以下是search_view表的结构及示例数据。

示例表结构

字段名称

类型

描述

pk

long

主键。

salt

long

随机盐值。

UserId

string

用户ID。

OrderId

string

订单ID。

price

double

订单金额。

timestamp

long

时间戳。

示例数据

pk(主键)

salt

UserId

OrderId

price

timestamp

1

1

user_A

00002664-9d8b-441b-bad7-845202f3b142

29.6

1744773183629

2

1

user_A

9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f

785.3

1744773190240

3

2

user_A

c3d4e5f6-7a8b-4901-8c9d-0a1b2c3d4e5f

187

1744773195579

4

3

user_B

f1e2d3c4-b5a6-4789-90ab-123cdef45678

11.9

1744773203345

5

4

user_B

e2f3a4b5-c6d7-4890-9abc-def012345678

2547

1744773207789