基于 Flink 实现交易数据的实时统计

更新时间:
复制 MD 格式

本文介绍如何将交易订单数据存储到表格存储,并通过实时计算 Flink 对增量数据进行窗口聚合统计。

方案概述

在电子商务和支付系统等场景中,交易数据的实时统计有助于及时掌握业务动态并优化运营策略。

本方案将表格存储作为交易数据的统一存储层,通过通道服务(Tunnel Service)将增量数据实时同步到 Flink 进行窗口聚合计算,并将聚合结果写回表格存储的结果表。

类别

说明

适用场景

电商交易实时统计、金融交易监控、实时运营大屏等需要对流式数据进行窗口聚合的场景。

方案优势

  • 全托管:Flink 全托管(VVP)无需自建集群,通过 SQL 定义流计算作业。

  • 实时性:通道服务提供毫秒级增量数据同步,Flink 支持亚秒级窗口聚合。

  • 统一存储:表格存储同时承担源表和结果表,无需额外引入消息队列或中间存储。

涉及产品

  • 表格存储:存储源交易数据和聚合结果数据。

  • 实时计算 Flink 版:提供流计算引擎,执行窗口聚合作业。

方案设计

数据流转过程如下:

  1. 客户端将实时交易数据写入表格存储的源表(source_order)。

  2. 通道服务(Tunnel Service)将源表的增量数据实时同步到 Flink。

  3. Flink 流计算作业按固定频率(如 1 分钟)的滚动窗口对交易数据进行聚合统计(订单数和订单总金额),并将结果写入表格存储的结果表(sink_order)。

  4. 客户端实时读取结果表展示聚合数据,也可通过 DataV 等大屏展示。

本方案涉及以下两张数据表。

表名

主键 / 属性列

说明

source_order

PK: metering (String), orderid (String)
属性列: price (Double), byerid (Long), sellerid (Long), productid (Long), ts (Long)

交易订单源表。每条记录包含计量类型、订单号、交易金额、买家 ID、卖家 ID、商品 ID 和时间戳(秒级)。

sink_order

PK: metering (String), ts (Long)
属性列: price (Double), ordercount (Long)

聚合结果表。Flink 按 1 分钟滚动窗口聚合后写入,每行对应一个时间窗口内的订单总金额和订单总数。

前提条件

方案实现

以下步骤使用 Java SDK 创建表格存储资源并写入测试数据,通过 Flink SQL 创建流计算作业。

步骤一:创建数据表和数据通道

通过 Java SDK 创建源表(source_order)和结果表(sink_order),并为源表创建增量类型的数据通道,用于将增量数据同步到 Flink。

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;

import java.util.*;

public class CreateTablesAndTunnel {
    public static void main(String[] args) {
        // Initialize V4 signature client.
        String endpoint = System.getenv("OTS_ENDPOINT");
        String accessKeyId = System.getenv("OTS_AK_ENV");
        String accessKeySecret = System.getenv("OTS_SK_ENV");
        String instanceName = System.getenv("OTS_INSTANCE");
        String region = System.getenv("OTS_REGION");

        DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
        V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
        CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
        SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));

        // 1. Create source_order table (PK: metering + orderid).
        createTable(client, "source_order", Arrays.asList(
            new PrimaryKeySchema("metering", PrimaryKeyType.STRING),
            new PrimaryKeySchema("orderid", PrimaryKeyType.STRING)));

        // 2. Create sink_order table (PK: metering + ts).
        createTable(client, "sink_order", Arrays.asList(
            new PrimaryKeySchema("metering", PrimaryKeyType.STRING),
            new PrimaryKeySchema("ts", PrimaryKeyType.INTEGER)));

        // 3. Create stream tunnel on source_order for Flink consumption.
        TunnelClient tunnelClient = new TunnelClient(endpoint, accessKeyId, accessKeySecret, instanceName);
        CreateTunnelRequest tunnelRequest = new CreateTunnelRequest(
            "source_order", "flink_agg", TunnelType.Stream);
        CreateTunnelResponse tunnelResponse = tunnelClient.createTunnel(tunnelRequest);
        System.out.println("Tunnel flink_agg created.");

        tunnelClient.shutdown();
        client.shutdown();
    }

    private static void createTable(SyncClient client, String tableName, List<PrimaryKeySchema> pkSchema) {
        TableMeta meta = new TableMeta(tableName);
        for (PrimaryKeySchema pk : pkSchema) {
            meta.addPrimaryKeyColumn(pk);
        }
        TableOptions options = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(meta, options);
        try {
            client.createTable(request);
            System.out.println("Table " + tableName + " created.");
        } catch (Exception e) {
            if (e.getMessage() != null && e.getMessage().contains("already exist")) {
                System.out.println("Table " + tableName + " already exists.");
            } else {
                throw new RuntimeException(e);
            }
        }
    }
}

步骤二:创建并启动 Flink 流计算作业

  1. 登录实时计算控制台,单击实例名称进入工作空间。

  2. 单击数据开发 > ETL,然后单击+ > 新建流作业

  3. 在对话框中输入文件名称,单击创建

  4. 在编辑器中粘贴以下 Flink SQL 代码。

    Flink SQL 通过表格存储的 ots connector 定义源表和结果表的临时表。源表通过 tunnelName 参数指定数据通道名称,Flink 从该通道读取增量数据;结果表通过 valueColumns 参数指定写入的属性列。INSERT INTO ... SELECT 语句启动流计算,按 1 分钟滚动窗口聚合订单数和订单总金额。

    -- Create source table: read incremental data from Tablestore via Tunnel.
    CREATE TEMPORARY TABLE tablestore_input (
        metering VARCHAR,
        orderid VARCHAR,
        price DOUBLE,
        byerid BIGINT,
        sellerid BIGINT,
        productid BIGINT,
        ts BIGINT,
        ptime AS TO_TIMESTAMP(ts * 1000),
        WATERMARK FOR ptime AS ptime - INTERVAL '2' SECOND
    ) WITH (
        'connector' = 'ots',
        'endPoint' = '<endpoint>',
        'instanceName' = '<instance-name>',
        'tableName' = 'source_order',
        'tunnelName' = 'flink_agg',
        'accessId' = '<AccessKey ID>',
        'accessKey' = '<AccessKey Secret>'
    );
    
    -- Create sink table: write aggregation results to Tablestore.
    CREATE TEMPORARY TABLE tablestore_output (
        metering VARCHAR,
        ts BIGINT,
        price DOUBLE,
        ordercount BIGINT,
        PRIMARY KEY(metering, ts) NOT ENFORCED
    ) WITH (
        'connector' = 'ots',
        'endPoint' = '<endpoint>',
        'instanceName' = '<instance-name>',
        'tableName' = 'sink_order',
        'accessId' = '<AccessKey ID>',
        'accessKey' = '<AccessKey Secret>',
        'valueColumns' = 'price,ordercount'
    );
    
    -- 1-minute tumbling window aggregation.
    INSERT INTO tablestore_output
    SELECT metering,
           UNIX_TIMESTAMP(CAST(TUMBLE_START(ptime, INTERVAL '1' MINUTE) AS STRING)) AS ts,
           SUM(price) AS price,
           COUNT(orderid) AS ordercount
    FROM tablestore_input
    GROUP BY TUMBLE(ptime, INTERVAL '1' MINUTE), metering;
  5. 单击右上角部署 > 确定,然后再次单击右上角前往运维 > 启动,启动作业。

步骤三:写入测试数据并验证聚合结果

以下代码模拟实时交易场景:持续向 source_order 表写入交易订单数据,同时轮询 sink_order 表获取 Flink 的实时聚合结果。每 5 秒写入一批 10 条订单,每 30 秒读取一次聚合结果,输出每个 1 分钟窗口内的订单数和订单总金额。

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;

import java.util.*;

public class RealtimeTradeDemo {
    public static void main(String[] args) throws InterruptedException {
        String endpoint = System.getenv("OTS_ENDPOINT");
        String accessKeyId = System.getenv("OTS_AK_ENV");
        String accessKeySecret = System.getenv("OTS_SK_ENV");
        String instanceName = System.getenv("OTS_INSTANCE");
        String region = System.getenv("OTS_REGION");

        DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
        V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
        CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
        SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));

        Random random = new Random();
        String metering = "web";
        int totalOrders = 0;

        // Write data and verify results for about 3 minutes.
        for (int round = 0; round < 36; round++) {
            // Write 10 orders per batch.
            long now = System.currentTimeMillis() / 1000;
            BatchWriteRowRequest writeRequest = new BatchWriteRowRequest();
            for (int i = 0; i < 10; i++) {
                String orderId = String.format("ord_%d_%04d", now, i);
                double price = Math.round((10.0 + random.nextDouble() * 990.0) * 100.0) / 100.0;
                PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("metering", PrimaryKeyValue.fromString(metering))
                    .addPrimaryKeyColumn("orderid", PrimaryKeyValue.fromString(orderId))
                    .build();
                RowPutChange row = new RowPutChange("source_order", pk);
                row.addColumn("price", ColumnValue.fromDouble(price));
                row.addColumn("byerid", ColumnValue.fromLong(1000 + random.nextInt(100)));
                row.addColumn("sellerid", ColumnValue.fromLong(2000 + random.nextInt(50)));
                row.addColumn("productid", ColumnValue.fromLong(3000 + random.nextInt(200)));
                row.addColumn("ts", ColumnValue.fromLong(now + i));
                writeRequest.addRowChange(row);
            }
            client.batchWriteRow(writeRequest);
            totalOrders += 10;

            // Every 30 seconds (6 rounds), read and display aggregation results.
            if ((round + 1) % 6 == 0) {
                System.out.println("\n--- Aggregation results (total " + totalOrders + " orders written) ---");
                RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("sink_order");
                criteria.setInclusiveStartPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("metering", PrimaryKeyValue.INF_MIN)
                    .addPrimaryKeyColumn("ts", PrimaryKeyValue.INF_MIN).build());
                criteria.setExclusiveEndPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("metering", PrimaryKeyValue.INF_MAX)
                    .addPrimaryKeyColumn("ts", PrimaryKeyValue.INF_MAX).build());
                criteria.setMaxVersions(1);
                criteria.setLimit(20);
                GetRangeResponse response = client.getRange(new GetRangeRequest(criteria));
                for (Row row : response.getRows()) {
                    long ts = row.getPrimaryKey().getPrimaryKeyColumn("ts").getValue().asLong();
                    double totalPrice = row.getColumn("price").get(0).getValue().asDouble();
                    long orderCount = row.getColumn("ordercount").get(0).getValue().asLong();
                    System.out.println(String.format("  Window %s: orders=%d, totalPrice=%.2f",
                        new java.text.SimpleDateFormat("HH:mm:ss").format(new java.util.Date(ts * 1000)),
                        orderCount, totalPrice));
                }
            }
            Thread.sleep(5000);
        }
        client.shutdown();
    }
}

运行后,控制台每 30 秒输出一次聚合结果,类似以下输出:

--- Aggregation results (total 60 orders written) ---
  Window 14:17:00: orders=103, totalPrice=48723.75
  Window 14:18:00: orders=118, totalPrice=62093.70

每行表示一个 1 分钟窗口的聚合结果。随着持续写入新的交易数据,新的窗口不断出现,已有窗口的统计值随数据补全而更新。

步骤四:(可选)DataV 可视化展示

如果需要将聚合结果在大屏上展示,可以使用 DataV 数据可视化服务接入表格存储数据源。

资源清理

说明

如果不再需要本方案中创建的资源,按以下步骤清理以避免产生费用。

  1. 实时计算控制台停止并删除流计算作业。

  2. 表格存储控制台删除 source_order 表上的数据通道(flink_agg),然后删除 source_order 和 sink_order 两张数据表。