本文介绍如何将交易订单数据存储到表格存储,并通过实时计算 Flink 对增量数据进行窗口聚合统计。
方案概述
在电子商务和支付系统等场景中,交易数据的实时统计有助于及时掌握业务动态并优化运营策略。
本方案将表格存储作为交易数据的统一存储层,通过通道服务(Tunnel Service)将增量数据实时同步到 Flink 进行窗口聚合计算,并将聚合结果写回表格存储的结果表。
|
类别 |
说明 |
|
适用场景 |
电商交易实时统计、金融交易监控、实时运营大屏等需要对流式数据进行窗口聚合的场景。 |
|
方案优势 |
|
|
涉及产品 |
|
方案设计
数据流转过程如下:
客户端将实时交易数据写入表格存储的源表(source_order)。
通道服务(Tunnel Service)将源表的增量数据实时同步到 Flink。
Flink 流计算作业按固定频率(如 1 分钟)的滚动窗口对交易数据进行聚合统计(订单数和订单总金额),并将结果写入表格存储的结果表(sink_order)。
客户端实时读取结果表展示聚合数据,也可通过 DataV 等大屏展示。
本方案涉及以下两张数据表。
|
表名 |
主键 / 属性列 |
说明 |
|
source_order |
PK: metering (String), orderid (String) |
交易订单源表。每条记录包含计量类型、订单号、交易金额、买家 ID、卖家 ID、商品 ID 和时间戳(秒级)。 |
|
sink_order |
PK: metering (String), ts (Long) |
聚合结果表。Flink 按 1 分钟滚动窗口聚合后写入,每行对应一个时间窗口内的订单总金额和订单总数。 |
前提条件
开始前,开通服务和创建实例(如已有可用实例,跳过此步骤)。
已开通实时计算 Flink 版工作空间,工作空间必须与表格存储实例位于同一地域。
方案实现
以下步骤使用 Java SDK 创建表格存储资源并写入测试数据,通过 Flink SQL 创建流计算作业。
步骤一:创建数据表和数据通道
通过 Java SDK 创建源表(source_order)和结果表(sink_order),并为源表创建增量类型的数据通道,用于将增量数据同步到 Flink。
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import java.util.*;
public class CreateTablesAndTunnel {
public static void main(String[] args) {
// Initialize V4 signature client.
String endpoint = System.getenv("OTS_ENDPOINT");
String accessKeyId = System.getenv("OTS_AK_ENV");
String accessKeySecret = System.getenv("OTS_SK_ENV");
String instanceName = System.getenv("OTS_INSTANCE");
String region = System.getenv("OTS_REGION");
DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));
// 1. Create source_order table (PK: metering + orderid).
createTable(client, "source_order", Arrays.asList(
new PrimaryKeySchema("metering", PrimaryKeyType.STRING),
new PrimaryKeySchema("orderid", PrimaryKeyType.STRING)));
// 2. Create sink_order table (PK: metering + ts).
createTable(client, "sink_order", Arrays.asList(
new PrimaryKeySchema("metering", PrimaryKeyType.STRING),
new PrimaryKeySchema("ts", PrimaryKeyType.INTEGER)));
// 3. Create stream tunnel on source_order for Flink consumption.
TunnelClient tunnelClient = new TunnelClient(endpoint, accessKeyId, accessKeySecret, instanceName);
CreateTunnelRequest tunnelRequest = new CreateTunnelRequest(
"source_order", "flink_agg", TunnelType.Stream);
CreateTunnelResponse tunnelResponse = tunnelClient.createTunnel(tunnelRequest);
System.out.println("Tunnel flink_agg created.");
tunnelClient.shutdown();
client.shutdown();
}
private static void createTable(SyncClient client, String tableName, List<PrimaryKeySchema> pkSchema) {
TableMeta meta = new TableMeta(tableName);
for (PrimaryKeySchema pk : pkSchema) {
meta.addPrimaryKeyColumn(pk);
}
TableOptions options = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(meta, options);
try {
client.createTable(request);
System.out.println("Table " + tableName + " created.");
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("already exist")) {
System.out.println("Table " + tableName + " already exists.");
} else {
throw new RuntimeException(e);
}
}
}
}
步骤二:创建并启动 Flink 流计算作业
登录实时计算控制台,单击实例名称进入工作空间。
单击,然后单击。
在对话框中输入文件名称,单击创建。
-
在编辑器中粘贴以下 Flink SQL 代码。
Flink SQL 通过表格存储的 ots connector 定义源表和结果表的临时表。源表通过
tunnelName参数指定数据通道名称,Flink 从该通道读取增量数据;结果表通过valueColumns参数指定写入的属性列。INSERT INTO ... SELECT语句启动流计算,按 1 分钟滚动窗口聚合订单数和订单总金额。-- Create source table: read incremental data from Tablestore via Tunnel. CREATE TEMPORARY TABLE tablestore_input ( metering VARCHAR, orderid VARCHAR, price DOUBLE, byerid BIGINT, sellerid BIGINT, productid BIGINT, ts BIGINT, ptime AS TO_TIMESTAMP(ts * 1000), WATERMARK FOR ptime AS ptime - INTERVAL '2' SECOND ) WITH ( 'connector' = 'ots', 'endPoint' = '<endpoint>', 'instanceName' = '<instance-name>', 'tableName' = 'source_order', 'tunnelName' = 'flink_agg', 'accessId' = '<AccessKey ID>', 'accessKey' = '<AccessKey Secret>' ); -- Create sink table: write aggregation results to Tablestore. CREATE TEMPORARY TABLE tablestore_output ( metering VARCHAR, ts BIGINT, price DOUBLE, ordercount BIGINT, PRIMARY KEY(metering, ts) NOT ENFORCED ) WITH ( 'connector' = 'ots', 'endPoint' = '<endpoint>', 'instanceName' = '<instance-name>', 'tableName' = 'sink_order', 'accessId' = '<AccessKey ID>', 'accessKey' = '<AccessKey Secret>', 'valueColumns' = 'price,ordercount' ); -- 1-minute tumbling window aggregation. INSERT INTO tablestore_output SELECT metering, UNIX_TIMESTAMP(CAST(TUMBLE_START(ptime, INTERVAL '1' MINUTE) AS STRING)) AS ts, SUM(price) AS price, COUNT(orderid) AS ordercount FROM tablestore_input GROUP BY TUMBLE(ptime, INTERVAL '1' MINUTE), metering;参数
说明
connector
连接器类型。固定取值为
ots。详见Tablestore连接器。endPoint
表格存储实例的 VPC 访问地址。
instanceName
表格存储的实例名称。
tableName
表格存储的数据表名称。
tunnelName
源表的数据通道名称。仅源表需要配置,Flink 通过此通道读取增量数据。
accessId / accessKey
阿里云账号的 AccessKey ID 和 AccessKey Secret。
valueColumns
结果表需要写入的属性列名,多个列用逗号分隔。仅结果表需要配置。
单击右上角,然后再次单击右上角,启动作业。
步骤三:写入测试数据并验证聚合结果
以下代码模拟实时交易场景:持续向 source_order 表写入交易订单数据,同时轮询 sink_order 表获取 Flink 的实时聚合结果。每 5 秒写入一批 10 条订单,每 30 秒读取一次聚合结果,输出每个 1 分钟窗口内的订单数和订单总金额。
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import java.util.*;
public class RealtimeTradeDemo {
public static void main(String[] args) throws InterruptedException {
String endpoint = System.getenv("OTS_ENDPOINT");
String accessKeyId = System.getenv("OTS_AK_ENV");
String accessKeySecret = System.getenv("OTS_SK_ENV");
String instanceName = System.getenv("OTS_INSTANCE");
String region = System.getenv("OTS_REGION");
DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));
Random random = new Random();
String metering = "web";
int totalOrders = 0;
// Write data and verify results for about 3 minutes.
for (int round = 0; round < 36; round++) {
// Write 10 orders per batch.
long now = System.currentTimeMillis() / 1000;
BatchWriteRowRequest writeRequest = new BatchWriteRowRequest();
for (int i = 0; i < 10; i++) {
String orderId = String.format("ord_%d_%04d", now, i);
double price = Math.round((10.0 + random.nextDouble() * 990.0) * 100.0) / 100.0;
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("metering", PrimaryKeyValue.fromString(metering))
.addPrimaryKeyColumn("orderid", PrimaryKeyValue.fromString(orderId))
.build();
RowPutChange row = new RowPutChange("source_order", pk);
row.addColumn("price", ColumnValue.fromDouble(price));
row.addColumn("byerid", ColumnValue.fromLong(1000 + random.nextInt(100)));
row.addColumn("sellerid", ColumnValue.fromLong(2000 + random.nextInt(50)));
row.addColumn("productid", ColumnValue.fromLong(3000 + random.nextInt(200)));
row.addColumn("ts", ColumnValue.fromLong(now + i));
writeRequest.addRowChange(row);
}
client.batchWriteRow(writeRequest);
totalOrders += 10;
// Every 30 seconds (6 rounds), read and display aggregation results.
if ((round + 1) % 6 == 0) {
System.out.println("\n--- Aggregation results (total " + totalOrders + " orders written) ---");
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("sink_order");
criteria.setInclusiveStartPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("metering", PrimaryKeyValue.INF_MIN)
.addPrimaryKeyColumn("ts", PrimaryKeyValue.INF_MIN).build());
criteria.setExclusiveEndPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("metering", PrimaryKeyValue.INF_MAX)
.addPrimaryKeyColumn("ts", PrimaryKeyValue.INF_MAX).build());
criteria.setMaxVersions(1);
criteria.setLimit(20);
GetRangeResponse response = client.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
long ts = row.getPrimaryKey().getPrimaryKeyColumn("ts").getValue().asLong();
double totalPrice = row.getColumn("price").get(0).getValue().asDouble();
long orderCount = row.getColumn("ordercount").get(0).getValue().asLong();
System.out.println(String.format(" Window %s: orders=%d, totalPrice=%.2f",
new java.text.SimpleDateFormat("HH:mm:ss").format(new java.util.Date(ts * 1000)),
orderCount, totalPrice));
}
}
Thread.sleep(5000);
}
client.shutdown();
}
}
运行后,控制台每 30 秒输出一次聚合结果,类似以下输出:
--- Aggregation results (total 60 orders written) ---
Window 14:17:00: orders=103, totalPrice=48723.75
Window 14:18:00: orders=118, totalPrice=62093.70
每行表示一个 1 分钟窗口的聚合结果。随着持续写入新的交易数据,新的窗口不断出现,已有窗口的统计值随数据补全而更新。
步骤四:(可选)DataV 可视化展示
如果需要将聚合结果在大屏上展示,可以使用 DataV 数据可视化服务接入表格存储数据源。
开通 DataV 服务,请参见开通DataV-Board服务。
添加表格存储数据源,请参见添加TableStore数据源。
资源清理
如果不再需要本方案中创建的资源,按以下步骤清理以避免产生费用。