本文介绍如何使用表格存储统一存储电商订单数据,通过 E-MapReduce(EMR)的 Spark 流批处理能力实现实时和离线数据聚合,并通过 DataV 大屏展示聚合结果。
方案概述
电商大屏需要同时展示全量订单的历史聚合结果和实时订单的动态统计指标。本方案使用表格存储作为统一存储层,存储原始订单数据和聚合结果数据,通过 EMR Spark 的流批处理能力完成数据聚合计算。
类别 | 说明 |
适用场景 |
|
技术选型 |
|
涉及产品 |
|
方案设计
数据流转过程如下:
SQL 客户端将原始订单数据实时写入表格存储的数据表(OrderSource)。
EMR 的 Spark 引擎对订单数据进行流批计算,并将聚合结果写回表格存储的数据表。
实时流计算:通过 Spark Structured Streaming 实时统计每个时间窗口内的订单数量和订单金额。
离线批计算:通过 Spark SQL 离线聚合原始订单数据的总金额和用户维度总金额。
DataV 接入表格存储的聚合结果表,在大屏上展示实时和离线计算结果。
方案涉及的表格存储数据表如下。
表名 | 主键 / 属性列 | 说明 |
OrderSource | PK: UserId (String), OrderId (String) | 原始订单数据表,存储所有订单记录。每个订单包含用户 ID、订单 ID、订单金额和下单时间戳。 |
OrderStreamSink | PK: begin (String), end (String) | 实时流计算结果表。Spark Structured Streaming 按 30 秒窗口聚合订单数据后写入此表,每行对应一个时间窗口内的订单总数和订单总金额。 |
OrderBatchSink | PK: UserId (String) | 用户维度批计算结果表。通过离线批计算将每个用户的累计订单数和累计消费金额写入此表。 |
OrderTotalSink | PK: count (Long) | 全局维度批计算结果表。通过离线批计算将全局的订单总数和订单总金额写入此表。 |
前提条件
开始前,开通服务和创建实例(如已有可用实例,跳过此步骤)。
已创建 E-MapReduce on ECS 集群。具体操作,请参见创建集群。创建时请注意:
配置项
说明
业务场景
选择数据湖。
产品版本
如果需要使用实时流计算功能(
streaming-sql的CREATE STREAM/CREATE SCAN语法),请选择 EMR-5.17.4 及以下版本。说明EMR-5.21.0 的 SPARK-EXTENSION 组件与 SPARK3 (3.5.3) 存在兼容性问题。如果仅使用离线批计算功能,可选择最新产品版本。
可选服务
选择 SPARK3(默认选择)。
EMR 集群所在 VPC 已绑定表格存储实例。具体操作,请参见为实例绑定 VPC。
重要绑定后,需使用绑定 VPC 对应的VPC访问地址访问表格存储。
方案实现
以下步骤使用 Java SDK 创建表格存储资源,以 EMR 5.17.4(Spark 3.4.2)的 streaming-sql 客户端执行 Spark SQL 操作为例。
步骤一:创建数据表和增量通道
通过 Java SDK 创建四张数据表(OrderSource、OrderStreamSink、OrderBatchSink、OrderTotalSink),并为 OrderSource 表创建增量类型的通道,用于后续实时流计算。
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import java.util.*;
public class CreateTablesAndTunnel {
public static void main(String[] args) {
// Initialize V4 signature client.
String endpoint = System.getenv("OTS_ENDPOINT");
String accessKeyId = System.getenv("OTS_AK_ENV");
String accessKeySecret = System.getenv("OTS_SK_ENV");
String instanceName = System.getenv("OTS_INSTANCE");
String region = System.getenv("OTS_REGION");
DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));
// 1. Create OrderSource table (PK: UserId + OrderId).
createTable(client, "OrderSource", Arrays.asList(
new PrimaryKeySchema("UserId", PrimaryKeyType.STRING),
new PrimaryKeySchema("OrderId", PrimaryKeyType.STRING)));
// 2. Create OrderStreamSink table (PK: begin + end).
createTable(client, "OrderStreamSink", Arrays.asList(
new PrimaryKeySchema("begin", PrimaryKeyType.STRING),
new PrimaryKeySchema("end", PrimaryKeyType.STRING)));
// 3. Create OrderBatchSink table (PK: UserId).
createTable(client, "OrderBatchSink", Collections.singletonList(
new PrimaryKeySchema("UserId", PrimaryKeyType.STRING)));
// 4. Create OrderTotalSink table (PK: count).
createTable(client, "OrderTotalSink", Collections.singletonList(
new PrimaryKeySchema("count", PrimaryKeyType.INTEGER)));
// 5. Create stream tunnel for OrderSource (required for real-time streaming).
TunnelClient tunnelClient = new TunnelClient(endpoint, accessKeyId, accessKeySecret, instanceName);
CreateTunnelRequest tunnelRequest = new CreateTunnelRequest(
"OrderSource", "OrderSourceTunnel", TunnelType.Stream);
CreateTunnelResponse tunnelResponse = tunnelClient.createTunnel(tunnelRequest);
System.out.println("Tunnel ID: " + tunnelResponse.getTunnelId());
System.out.println("Use this tunnel ID in Spark SQL: tunnel.id=\"" + tunnelResponse.getTunnelId() + "\"");
tunnelClient.shutdown();
client.shutdown();
}
private static void createTable(SyncClient client, String tableName, List<PrimaryKeySchema> pkSchema) {
TableMeta meta = new TableMeta(tableName);
for (PrimaryKeySchema pk : pkSchema) {
meta.addPrimaryKeyColumn(pk);
}
TableOptions options = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(meta, options);
try {
client.createTable(request);
System.out.println("Table " + tableName + " created.");
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("already exist")) {
System.out.println("Table " + tableName + " already exists.");
} else {
throw new RuntimeException(e);
}
}
}
}运行成功后,控制台会输出 Tunnel ID,请记录该 ID,后续创建 Spark 外表时需要填入 tunnel.id 参数。
步骤二:写入测试数据
通过 BatchWriteRow 接口向 OrderSource 表批量写入模拟的电商订单数据。以下示例生成 500 条订单,包含 5 个用户,每条订单包含随机金额和时间戳。
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import java.util.*;
public class InsertTestOrders {
public static void main(String[] args) {
String endpoint = System.getenv("OTS_ENDPOINT");
String accessKeyId = System.getenv("OTS_AK_ENV");
String accessKeySecret = System.getenv("OTS_SK_ENV");
String instanceName = System.getenv("OTS_INSTANCE");
String region = System.getenv("OTS_REGION");
DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));
Random random = new Random(42);
String[] users = {"user_001", "user_002", "user_003", "user_004", "user_005"};
long baseTime = System.currentTimeMillis();
int totalRows = 0;
// Insert 500 orders in batches of 200.
List<RowPutChange> batch = new ArrayList<>();
for (int i = 0; i < 500; i++) {
String userId = users[random.nextInt(users.length)];
String orderId = String.format("order_%06d", i + 1);
double price = Math.round((10.0 + random.nextDouble() * 990.0) * 100.0) / 100.0;
long timestamp = baseTime - (500 - i) * 1000L;
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("UserId", PrimaryKeyValue.fromString(userId))
.addPrimaryKeyColumn("OrderId", PrimaryKeyValue.fromString(orderId))
.build();
RowPutChange row = new RowPutChange("OrderSource", pk);
row.addColumn("price", ColumnValue.fromDouble(price));
row.addColumn("timestamp", ColumnValue.fromLong(timestamp));
batch.add(row);
if (batch.size() >= 200) {
writeBatch(client, batch);
totalRows += batch.size();
batch.clear();
}
}
if (!batch.isEmpty()) {
writeBatch(client, batch);
totalRows += batch.size();
}
System.out.println("Inserted " + totalRows + " orders into OrderSource.");
client.shutdown();
}
private static void writeBatch(SyncClient client, List<RowPutChange> rows) {
BatchWriteRowRequest request = new BatchWriteRowRequest();
for (RowPutChange row : rows) {
request.addRowChange(row);
}
client.batchWriteRow(request);
}
}步骤三:启动 Spark SQL 客户端
登录 EMR 集群的 Master 节点,执行以下命令启动 streaming-sql 客户端。
EMR 集群自带 streaming-sql 客户端和 Tablestore connector(emr-datasources_shaded jar),无需手动下载。启动时需手动加载 Hadoop 的 commons-net 依赖。以下示例以 EMR 5.17.4(Spark 3.4.2)为例。
COMMONS_NET=$(find /opt/apps/HADOOP-COMMON -name "commons-net*.jar" | head -1)
streaming-sql --master yarn --deploy-mode client \
--driver-class-path $COMMONS_NET \
--jars $COMMONS_NET启动后进入交互式 SQL 命令行,后续步骤的 SQL 语句均在此客户端中执行。
步骤四:创建 Spark 外表
通过 CREATE TABLE ... USING tablestore 语法创建 Spark 外表(External Table),将 Spark 表映射到表格存储的数据表。对外表的读写操作通过 Tablestore connector 直接读写表格存储中的数据。
创建原始订单数据的外表 order_source。如果仅使用离线批计算,tunnel.id 可不填。
DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderSource",
tunnel.id="<tunnel-id>",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"},
"OrderId": {"col": "OrderId", "type": "string"},
"price": {"col": "price", "type": "double"},
"timestamp": {"col": "timestamp", "type": "long"}}}'
);参数 | 说明 |
endpoint | 表格存储实例的 VPC访问地址。 |
access.key.id | 阿里云账号的 AccessKey ID。 |
access.key.secret | 阿里云账号的 AccessKey Secret。 |
instance.name | 表格存储的实例名称。 |
table.name | 表格存储的数据表名称。 |
tunnel.id | 表格存储的增量通道 ID。仅实时流计算时必填,离线批计算时可不填。 |
catalog | 表的字段 Schema 定义(JSON 格式)。定义列名、列映射和数据类型。 |
参照以上方式,分别为三张结果表创建外表。
-- Create external table for OrderStreamSink.
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},
"end": {"col": "end", "type": "string"},
"count": {"col": "count", "type": "long"},
"totalPrice": {"col": "totalPrice", "type": "double"}}}'
);
-- Create external table for OrderBatchSink.
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderBatchSink",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"},
"count": {"col": "count", "type": "long"},
"totalPrice": {"col": "totalPrice", "type": "double"}}}'
);
-- Create external table for OrderTotalSink.
DROP TABLE IF EXISTS order_total_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderTotalSink",
catalog='{"columns": {"count": {"col": "count", "type": "long"},
"totalPrice": {"col": "totalPrice", "type": "double"}}}'
);步骤五:离线批计算
通过 Spark SQL 对原始订单数据进行全量聚合,将结果写入对应的 Sink 表。
执行以下 SQL 将用户维度的聚合结果写入 order_batch_sink 表。
INSERT INTO order_batch_sink
SELECT UserId, count(*) AS count, sum(price) AS totalPrice
FROM order_source
GROUP BY UserId;执行以下 SQL 将全局维度的聚合结果写入 order_total_sink 表。
INSERT INTO order_total_sink
SELECT count(*) AS count, sum(price) AS totalPrice
FROM order_source;执行以下 SQL 验证聚合结果。
-- 验证:查询订单总数。
SELECT count(*) AS total_orders FROM order_source;
-- 验证:用户维度聚合结果。
SELECT * FROM order_batch_sink;
-- 验证:全局聚合结果。
SELECT * FROM order_total_sink;步骤六:实时流计算
通过 Spark Structured Streaming 实时统计每个时间窗口内的订单数量和订单金额,并将聚合结果写入 order_stream_sink 表。
在 order_source 表上创建流式视图。
CREATE SCAN order_source_stream_view ON order_source USING STREAM
OPTIONS("maxoffsetsperchannel"="10000");创建流计算作业,按 30 秒窗口对订单数据进行聚合。
CREATE STREAM job1
OPTIONS(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin,
CAST(window.end AS String) AS end,
count(*) AS count,
CAST(sum(price) AS Double) AS totalPrice
FROM order_source_stream_view
GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");流计算作业启动后持续运行。新的订单数据写入 OrderSource 表后,Spark 通过表格存储的增量通道实时获取新增数据,按 30 秒时间窗口聚合,将每个窗口内的订单数(count)和订单总金额(totalPrice)写入 OrderStreamSink 表。
步骤七:(可选)DataV 可视化展示
如果需要将聚合结果在大屏上展示,可以使用 DataV 数据可视化服务接入表格存储数据源。
开通 DataV 服务,请参见开通DataV-Board服务。
添加表格存储数据源,请参见添加TableStore数据源。
各结果表推荐的展示方式如下。
结果表 | 展示数据 | 推荐组件 |
OrderTotalSink | 全局订单总数(count)和订单总金额(totalPrice) | 数字翻牌器或 KPI 数字卡片 |
OrderStreamSink | 每 30 秒窗口的实时订单数和金额趋势 | 实时折线图或动态柱状图 |
OrderBatchSink | 用户维度的累计订单数和消费金额排行 | 排行榜或水平条形图 |
资源清理
如果不再需要本方案中创建的资源,按以下步骤清理以避免产生费用。