基于 Flink 实现消费数据的实时关联分析

更新时间:
复制 MD 格式

Flink CDC 实时捕获 RDS MySQL 数据变更,关联表格存储(Tablestore)中的商品信息,将分析结果写入表格存储的结果表,实现消费数据的实时关联分析。

方案概述

零售和电商场景中,消费记录通常存储在关系型数据库中,商品信息和分析结果则需要支撑高并发查询。Flink CDC 连接器实时捕获 MySQL 的数据变更,关联表格存储中的商品维表后,将结果写入表格存储的结果表。

类别

说明

适用场景

连锁超市消费记录关联商品信息,按品类实时统计 GMV(商品交易总额)

方案优势

  • 数据变更秒级捕获,无需轮询数据库

  • 商品信息存储在表格存储中,支持 PB 级数据量和毫秒级查询

  • 关联结果写入表格存储后,可通过多元索引按多个维度查询分析

涉及产品

  • 表格存储:商品信息存储和分析结果存储

  • RDS MySQL:消费记录数据源

  • 实时计算 Flink:数据变更捕获和跨数据源实时关联计算

方案设计

数据流如下:

  1. RDS MySQL 存储消费记录(consume_record 表),每产生一笔消费即写入一条记录。

  2. Tablestore存储商品信息表(product 表),包含商品编号、单价和品类。

  3. Flink 通过 MySQL CDC 连接器读取 MySQL 的全量数据和增量变更。

  4. Flink 根据商品编号关联表格存储中的商品信息,为每条消费记录补充商品单价和品类。

  5. 关联后的结果写入表格存储的结果表(consume_product 表)。

本方案涉及以下三张数据表。

MySQL 消费记录源表(consume_record)

字段

类型

说明

consume_id(主键)

VARCHAR(20)

消费记录编号

product_id

VARCHAR(20)

商品编号,用于关联商品信息表

consume_time

BIGINT

消费时间戳(秒)

consume_name

VARCHAR(20)

消费者名称

consume_phone

VARCHAR(20)

消费者联系方式

表格存储商品信息表(product)

字段

类型

说明

product_id(主键列)

STRING

商品编号

price

DOUBLE

商品单价

product_type

STRING

商品品类

表格存储关联结果表(consume_product)

字段

类型

说明

consume_id(主键列)

STRING

消费记录编号

product_id(主键列)

STRING

商品编号

price

DOUBLE

商品单价(关联自商品信息表)

consume_time

BIGINT

消费时间戳(秒)

consume_name

STRING

消费者名称

consume_phone

STRING

消费者联系方式

product_type

STRING

商品品类(关联自商品信息表)

前提条件

重要

表格存储实例、Flink 工作空间、RDS MySQL 实例需位于同一地域。

方案实现

以下步骤使用 Java SDK 创建表格存储数据表和管理数据。

步骤一:创建数据表并写入商品数据

在 RDS MySQL 中创建消费记录源表。

CREATE TABLE consume_record (
    consume_id varchar(20) NOT NULL,
    product_id varchar(20) NOT NULL,
    consume_time bigint(20) NOT NULL,
    consume_name varchar(20) NOT NULL,
    consume_phone varchar(20) NOT NULL,
    PRIMARY KEY (consume_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

通过 Java SDK 在表格存储中创建商品信息表和关联结果表,并向商品信息表写入商品数据。

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.V4Credentials;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentialProvider;

public class FlinkBigdataSetup {
    public static void main(String[] args) {
        String endpoint = System.getenv("OTS_ENDPOINT");
        String accessKeyId = System.getenv("OTS_AK_ENV");
        String accessKeySecret = System.getenv("OTS_SK_ENV");
        String instanceName = System.getenv("OTS_INSTANCE");
        String region = System.getenv("OTS_REGION");

        DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
        V4Credentials v4 = V4Credentials.createByServiceCredentials(credentials, region);
        DefaultCredentialProvider provider = new DefaultCredentialProvider(v4);
        SyncClient client = new SyncClient(endpoint, provider, instanceName, null,
                new ResourceManager(null, null));

        // 创建商品表(主键:product_id)
        TableMeta productMeta = new TableMeta("product");
        productMeta.addPrimaryKeyColumn("product_id", PrimaryKeyType.STRING);
        TableOptions options = new TableOptions(-1, 1);
        CreateTableRequest createReq = new CreateTableRequest(productMeta, options);
        createReq.setReservedThroughput(new ReservedThroughput(0, 0));
        client.createTable(createReq);

        // 创建消费商品结果表(主键:consume_id + product_id)
        TableMeta resultMeta = new TableMeta("consume_product");
        resultMeta.addPrimaryKeyColumn("consume_id", PrimaryKeyType.STRING);
        resultMeta.addPrimaryKeyColumn("product_id", PrimaryKeyType.STRING);
        createReq = new CreateTableRequest(resultMeta, options);
        createReq.setReservedThroughput(new ReservedThroughput(0, 0));
        client.createTable(createReq);

        // 插入商品数据
        String[][] products = {
            {"P001", "15.5", "食品"},    {"P002", "89.0", "服装"},
            {"P003", "2999.0", "电子产品"}, {"P004", "45.0", "日用品"},
            {"P005", "128.0", "美妆"},    {"P006", "35.0", "饮料"},
            {"P007", "599.0", "运动"},    {"P008", "12.0", "文具"},
            {"P009", "268.0", "家居"},    {"P010", "1599.0", "家电"}
        };
        for (String[] p : products) {
            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("product_id", PrimaryKeyValue.fromString(p[0]))
                    .build();
            RowPutChange change = new RowPutChange("product", pk);
            change.addColumn("price", ColumnValue.fromDouble(Double.parseDouble(p[1])));
            change.addColumn("product_type", ColumnValue.fromString(p[2]));
            client.putRow(new PutRowRequest(change));
        }
        System.out.println("已创建数据表并插入" + products.length + "条商品记录。");
        client.shutdown();
    }
}

步骤二:创建并启动 Flink 实时关联分析作业

在实时计算 Flink 控制台创建 SQL 流作业,通过 MySQL CDC 连接器实时读取消费记录变更,关联表格存储商品信息后写入结果表。

  1. 登录实时计算控制台,单击实例名称进入工作空间。

  2. 单击数据开发 > ETL,然后单击+ > 新建流作业

  3. 在对话框中输入文件名称,单击创建

  4. 在编辑器中粘贴以下 Flink SQL 代码,将代码中的参数替换为实际值。

    -- 数据源:RDS MySQL CDC
    CREATE TEMPORARY TABLE mysql_source (
        consume_id   VARCHAR,
        product_id   VARCHAR,
        consume_time BIGINT,
        consume_name VARCHAR,
        consume_phone VARCHAR,
        PRIMARY KEY (consume_id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '<RDS内网地址>',
        'port' = '3306',
        'database-name' = '<数据库名>',
        'table-name' = 'consume_record',
        'username' = '<用户名>',
        'password' = '<密码>'
    );
    
    -- 维表:表格存储商品表
    CREATE TEMPORARY TABLE ots_product (
        product_id   VARCHAR,
        price        DOUBLE,
        product_type VARCHAR,
        PRIMARY KEY (product_id) NOT ENFORCED
    ) WITH (
        'connector' = 'ots',
        'endPoint' = 'https://<实例名称>.<地域>.vpc.tablestore.aliyuncs.com',
        'instanceName' = '<实例名称>',
        'tableName' = 'product',
        'accessId' = '<AccessKey ID>',
        'accessKey' = '<AccessKey Secret>'
    );
    
    -- 结果表:表格存储结果表
    CREATE TEMPORARY TABLE ots_sink (
        consume_id    VARCHAR,
        product_id    VARCHAR,
        price         DOUBLE,
        consume_time  BIGINT,
        consume_name  VARCHAR,
        consume_phone VARCHAR,
        product_type  VARCHAR,
        PRIMARY KEY (consume_id, product_id) NOT ENFORCED
    ) WITH (
        'connector' = 'ots',
        'endPoint' = 'https://<实例名称>.<地域>.vpc.tablestore.aliyuncs.com',
        'instanceName' = '<实例名称>',
        'tableName' = 'consume_product',
        'accessId' = '<AccessKey ID>',
        'accessKey' = '<AccessKey Secret>',
        'valueColumns' = 'price,consume_time,consume_name,consume_phone,product_type'
    );
    
    -- 关联查询:将每条消费记录与商品信息进行关联
    INSERT INTO ots_sink
    SELECT
        s.consume_id,
        s.product_id,
        p.price,
        s.consume_time,
        s.consume_name,
        s.consume_phone,
        p.product_type
    FROM mysql_source AS s
    JOIN ots_product FOR SYSTEM_TIME AS OF PROCTIME() AS p
        ON s.product_id = p.product_id;

    上述 Flink SQL 中使用的连接器参数说明如下。

    mysql-cdc 连接器参数

    参数

    说明

    connector

    连接器类型。固定取值为 mysql-cdc

    hostname

    RDS MySQL 实例的内网地址。

    port

    RDS MySQL 实例的端口号,默认为 3306。

    database-name

    RDS MySQL 数据库名称。

    table-name

    MySQL 源表名称。

    username

    RDS MySQL 数据库账号。

    password

    RDS MySQL 数据库密码。

    ots 连接器参数

    参数

    说明

    connector

    连接器类型。固定取值为 ots。详见Tablestore连接器

    endPoint

    表格存储实例的 VPC 地址。

    instanceName

    表格存储实例名称。

    tableName

    表格存储数据表名称。

    accessId

    阿里云账号的 AccessKey ID。

    accessKey

    阿里云账号的 AccessKey Secret。

    valueColumns

    写入结果表时需要更新的属性列名称,多个列名之间用英文逗号(,)分隔。仅 Sink 表需配置。

    说明
    • MySQL CDC 连接器在作业启动时先读取源表的全量数据,之后自动切换为增量模式,持续捕获数据变更。

    • Flink 使用 FOR SYSTEM_TIME AS OF PROCTIME() 语法,在每条消费记录到达时实时查询表格存储中的最新商品信息。

  5. 单击右上角部署 > 确定部署作业。然后单击前往运维 > 启动启动作业。

步骤三:写入测试数据并验证分析结果

Flink 作业启动后,向 MySQL 源表写入消费记录。Flink 实时捕获数据变更,关联商品信息后将完整结果写入表格存储的 consume_product 表。以下代码向 MySQL 写入测试数据,并从表格存储读取关联结果进行验证。

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.V4Credentials;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentialProvider;

import java.sql.*;
import java.util.Random;

public class FlinkBigdataVerify {
    public static void main(String[] args) throws Exception {
        // Write test data to MySQL
        String mysqlUrl = String.format("jdbc:mysql://%s:%s/%s?useSSL=false",
                System.getenv("MYSQL_HOST"), System.getenv("MYSQL_PORT"), System.getenv("MYSQL_DB"));
        Connection conn = DriverManager.getConnection(mysqlUrl,
                System.getenv("MYSQL_USER"), System.getenv("MYSQL_PASS"));

        String[] productIds = {"P001","P002","P003","P004","P005","P006","P007","P008","P009","P010"};
        String[] names = {"Alice","Bob","Charlie","Diana","Eve"};
        Random rand = new Random();
        long now = System.currentTimeMillis() / 1000;

        PreparedStatement ps = conn.prepareStatement(
                "INSERT INTO consume_record VALUES (?,?,?,?,?)");
        for (int i = 1; i <= 20; i++) {
            ps.setString(1, String.format("C%04d", i));
            ps.setString(2, productIds[rand.nextInt(productIds.length)]);
            ps.setLong(3, now + i);
            ps.setString(4, names[rand.nextInt(names.length)]);
            ps.setString(5, String.format("138%08d", rand.nextInt(100000000)));
            ps.executeUpdate();
        }
        System.out.println("Inserted 20 test records to MySQL.");
        conn.close();

        // Wait for Flink to process
        System.out.println("Waiting 60 seconds for Flink to process...");
        Thread.sleep(60000);

        // Verify result table in Tablestore
        String endpoint = System.getenv("OTS_ENDPOINT");
        String ak = System.getenv("OTS_AK_ENV");
        String sk = System.getenv("OTS_SK_ENV");
        String instance = System.getenv("OTS_INSTANCE");
        String region = System.getenv("OTS_REGION");

        DefaultCredentials cred = new DefaultCredentials(ak, sk);
        V4Credentials v4 = V4Credentials.createByServiceCredentials(cred, region);
        DefaultCredentialProvider provider = new DefaultCredentialProvider(v4);
        SyncClient client = new SyncClient(endpoint, provider, instance, null,
                new ResourceManager(null, null));

        RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("consume_product");
        criteria.setInclusiveStartPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("consume_id", PrimaryKeyValue.INF_MIN)
                .addPrimaryKeyColumn("product_id", PrimaryKeyValue.INF_MIN).build());
        criteria.setExclusiveEndPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("consume_id", PrimaryKeyValue.INF_MAX)
                .addPrimaryKeyColumn("product_id", PrimaryKeyValue.INF_MAX).build());
        criteria.setMaxVersions(1);

        GetRangeResponse resp = client.getRange(new GetRangeRequest(criteria));
        int count = 0;
        for (Row row : resp.getRows()) {
            count++;
            System.out.printf("%s | %s | price=%.1f | type=%s | name=%s%n",
                    row.getPrimaryKey().getPrimaryKeyColumn("consume_id").getValue(),
                    row.getPrimaryKey().getPrimaryKeyColumn("product_id").getValue(),
                    row.getLatestColumn("price").getValue().asDouble(),
                    row.getLatestColumn("product_type").getValue().asString(),
                    row.getLatestColumn("consume_name").getValue().asString());
        }
        System.out.println("Total: " + count + " rows in result table.");
        client.shutdown();
    }
}

预期输出如下,每条消费记录都已关联上商品信息表中的单价和品类。

C0001 | P007 | price=599.0 | type=运动 | name=Alice
C0002 | P003 | price=2999.0 | type=电子产品 | name=Bob
C0003 | P001 | price=15.5 | type=食品 | name=Charlie
...
Total: 20 rows in result table.

步骤四:(可选)通过 DataV 可视化展示分析结果

如需可视化展示关联分析结果,通过 DataV 连接表格存储的结果表,按商品品类统计消费金额。

资源清理

说明

如果不再需要本方案中创建的资源,请及时释放以避免持续产生费用。