流计算

本文介绍在使用Spark计算引擎访问表格存储时,如何通过DataFrame编程方式对表格存储中的数据进行流计算,并分别在本地和集群环境中进行运行调试。

准备工作

  • 在表格存储中创建数据表,并创建数据通道、写入数据。详情请参见宽表模型快速入门通道服务快速入门

    说明

    数据表order_source_stream_view的结构及示例数据,请参见附录:示例数据表

  • 为阿里云账号或具有表格存储访问权限的RAM用户创建AccessKey

  • 搭建Java开发环境。

    本文以Windows环境、JDK 1.8IntelliJ IDEA 2024.1.2 (Community Edition)Apache Maven为例介绍。

操作步骤

步骤一:下载项目源码

通过Git下载样例项目。

git clone https://github.com/aliyun/tablestore-examples.git

如果因为网络问题无法下载,您也可以直接下载tablestore-examples-master.zip

步骤二:更新Maven依赖

  1. 进入tablestore-spark-demo根目录。

    说明

    推荐您阅读tablestore-spark-demo根目录下的README.md文档,全面了解项目信息。

  2. 执行以下命令,安装emr-tablestore-2.2.0-SNAPSHOT.jar到本地Maven仓库。

    mvn install:install-file -Dfile="libs/emr-tablestore-2.2.0-SNAPSHOT.jar" -DartifactId=emr-tablestore -DgroupId="com.aliyun.emr" -Dversion="2.2.0-SNAPSHOT" -Dpackaging=jar -DgeneratePom=true

步骤三:(可选)修改示例代码

核心代码修改说明

此处以StructuredTableStoreAggSQLSample为例,对该示例代码的核心部分进行如下说明。

代码块

描述

val ordersDF = sparkSession.readStream
  .format("tablestore")
  .option("instance.name", instanceName)
  .option("table.name", tableName)
  .option("tunnel.id", tunnelId)
  .option("endpoint", endpoint)
  .option("access.key.id", accessKeyId)
  .option("access.key.secret", accessKeySecret)
  .option("maxoffsetsperchannel", maxOffsetsPerChannel) // default 10000
  .option("catalog", dataCatalog)
  .load()
  .createTempView("order_source_stream_view")

通过Spark Structured Streaming,从表格存储中读取流式数据,并将其加载为一个流式DataFrame,同时注册为临时视图order_source_stream_view

  • format("tablestore")表示使用ServiceLoader方式加载Spark Tablestore connector。具体配置,请参见META-INF.services目录。

  • instanceNametableNametunnelIdendpointaccessKeyIdaccessKeySecret分别表示表格存储的实例名称、数据表名称、通道ID、实例endpoint、阿里云账号或RAM用户的AccessKey IDAccessKey Secret。

  • catalog是一个JSON串,用于描述表格存储中的Schema信息。

  • maxoffsetsperchannel表示每一个mini-batch中每一个channel(分区)最多读取的数据量,默认值为10000。

val dataCatalog: String =
  s"""
     |{"columns": {
     |    "UserId": {"type":"string"},
     |    "OrderId": {"type":"string"},
     |    "price": {"type":"double"},
     |    "timestamp": {"type":"long"}
     | }
     |}""".stripMargin

定义了一个JSON字符串dataCatalog,用于描述表格存储中的Schema信息。每个字段的名称及其数据类型均以键值对的形式呈现。

val aggDF = sparkSession.sql(
  "SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, " +
    "CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view " +
    "GROUP BY window(to_timestamp(timestamp / 1000), '30 seconds')")
    
val query = aggDF.writeStream
  .outputMode("complete")
  .format("console")
  .option("truncate", value = false)
  .option("checkpointLocation", checkpointLocation)
  .option("triggerInterval", 10000) // custom
  .start()

通过Spark SQL对流式数据进行时间窗口聚合分析,并将结果输出到控制台。

  • SQL查询部分

    • 时间窗口:基于timestamp字段,将数据划分为30秒的时间窗口。

    • 聚合操作:统计每个窗口内的记录总数,并计算每个窗口内price字段的总和。

  • 配置流式查询的输出方式

    • outputMode("complete")表示指定输出模式为complete,即每次触发时输出完整的聚合结果。

    • format("console"):表示将结果输出到控制台。

    • option("truncate", value = false)表示禁用字段截断,确保完整显示字段值。

    • option("checkpointLocation", checkpointLocation)表示设置检查点目录,用于容错和恢复。

    • option("triggerInterval", 10000)表示设置触发间隔为10秒,即每10秒执行一次聚合操作。

    • start()表示启动流式查询。

步骤四:运行调试

您可以在本地或者通过Spark集群进行运行调试。此处以StructuredTableStoreAggSQLSample为例介绍调试过程。

本地开发环境

以在Windows操作系统上使用IntelliJ IDEA为例,为您介绍如何调试。

  1. 安装Scala插件。

    IntelliJ IDEA默认不支持Scala,需要您手动安装Scala插件。

  2. 安装winutils.exe(本文使用winutils 3.3.6)。

    仅在Windows环境下运行Spark时,还需安装 winutils.exe以解决兼容性问题。您可以通过github项目主页进行下载。

  3. Scala程序TableStoreBatchSample上右键单击,选择Modify Run Configuration,打开Edit Run Configuration弹窗。

    说明

    由于操作系统及IntelliJ IDEA的版本不同,实际操作可能存在一些细微差异。

    1. Program arguments栏中,依次配置实例名称、数据表名称、数据通道ID、AccessKey ID、AccessKey Secret、实例Endpoint参数和MaxOffsetsPerChannel。

      myinstance order_source_stream_view 8f6a****-****-****-****-************ LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 10000
    2. 单击Modify options,勾选Add dependencies with "provided" scope to classpath选项,并单击OK。

      2025-05-13_150824

  4. 运行Scala程序。

    运行结束后,将打印结果到控制台。

    +-------------------+-------------------+-----+-----------------+
    |begin              |end                |count|totalPrice       |
    +-------------------+-------------------+-----+-----------------+
    |2025-04-16 11:13:30|2025-04-16 11:14:00|1    |2547.0           |
    |2025-04-16 11:13:00|2025-04-16 11:13:30|3    |984.1999999999999|
    |2025-04-16 11:12:30|2025-04-16 11:13:00|1    |29.6             |
    +-------------------+-------------------+-----+-----------------+

Spark集群环境

重要

在进行调试之前,请确保已构建Spark集群,并且该集群环境中的Spark版本与样例项目的Spark版本一致。否则,可能会因版本不兼容而导致运行错误。

spark-submit方式为例说明。示例代码中的master默认为local[*],在Spark集群上运行时可以去掉,使用spark-submit参数传入。

  1. 执行命令mvn -U clean package对项目进行打包,JAR包的路径为target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

  2. JAR包上传到Spark集群的Driver节点,并使用spark-submit提交任务。

    spark-submit --class com.aliyun.tablestore.spark.demo.streaming.StructuredTableStoreAggSQLSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar myinstance order_source_stream_view 8f6a****-****-****-****-************ LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 10000

    fig_dataframe_streaming_001

附录:示例数据表

以下是order_source_stream_view表的结构及示例数据。

示例表结构

字段名称

类型

描述

pk

long

主键。

UserId

string

用户ID。

OrderId

string

订单ID。

price

double

订单金额。

timestamp

long

时间戳。

示例数据

pk(主键)

UserId

OrderId

price

timestamp

1

user_A

00002664-9d8b-441b-bad7-845202f3b142

29.6

1744773175362

2

user_A

9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f

785.3

1744773190240

3

user_A

c3d4e5f6-7a8b-4901-8c9d-0a1b2c3d4e5f

187

1744773195579

4

user_B

f1e2d3c4-b5a6-4789-90ab-123cdef45678

11.9

1744773203345

5

user_B

e2f3a4b5-c6d7-4890-9abc-def012345678

2547

1744773227789