Partial Update(宽表拼接)

本文将为您介绍Partial Update的使用。

为什么需要Partial Update实现大宽表拼接

在传统流式数据管道中,为构建宽表常需在主键上 Join 多个流或表。例如,在实时推荐系统中,为了获得用户 360° 视图,往往需要整合来自偏好、购买、点击、购物车等多个不同数据源的更新。

虽然 Apache Flink 可实现多流 Join,但在大规模场景下存在多个痛点:

  • 状态规模庞大 :需缓存所有事件直到完成 Join,状态可能长期甚至无限保留;

  • 性能瓶颈 :检查点开销大、易引发反压,影响整体吞吐和稳定性;

  • 调试困难 :状态复杂且不可见,问题排查成本高;

  • 数据一致性差 :TTL 设置可能导致事件提前被丢弃,造成结果错误。

因此,我们迫切的需要一种新的数据打宽方法,避免复杂 Join,并显著降低资源消耗和维护复杂度。

基于Fluss Partial Update的数据打宽新模式

Fluss 引入基于Partial Update(部分更新)机制的数据打宽方法,支持在流处理作业中直接扩展维度信息,无需依赖复杂的多流 Join。

与传统方式不同,Fluss允许各数据源根据主键独立更新自己相关的字段到同一张宽表中。例如,你可以定义一张以user_id为主键的user_profile表,包含所有可能字段。每个数据流只需写入自己知道的部分字段,Fluss 存储引擎会自动按主键合并这些更新。

image

其底层机制是:当某条部分更新到达时,Fluss 会查找该主键已有的记录,并仅更新本次提供的字段,其余字段保持不变。合并后的新版本实时落盘,确保每条记录始终是最新的完整状态,其底层机制类似于数据库中的“增量更新”,每次只更新有变化的字段,而非整条记录。这种方式大幅降低了 Flink 作业的状态压力和计算复杂度,使数据管道更轻量、高效、易维护。

实践示例

前提条件

步骤一:创建表

已创建名为fluss-demo的 Fluss Catalog。

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据开发 > 数据查询 > 新建临时查询脚本

在默认的fluss数据库中创建三张表代表构建宽表的不同数据源,及一张宽表作为结果表

-- Recommendations – model scores
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.recommendations (
    user_id  STRING,
    item_id  STRING,
    rec_score DOUBLE,
    rec_ts   TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');


-- Impressions – how often we showed something
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.impressions (
    user_id STRING,
    item_id STRING,
    imp_cnt INT,
    imp_ts  TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');

-- Clicks – user engagement
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.clicks (
    user_id  STRING,
    item_id  STRING,
    click_cnt INT,
    clk_ts    TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');

-- Result wide table
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.user_rec_wide (
    user_id   STRING,
    item_id   STRING,
    rec_score DOUBLE,   -- updated by recs stream
    imp_cnt   INT,      -- updated by impressions stream
    click_cnt INT,      -- updated by clicks stream
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');

点击运行完成示例表创建:

image

步骤二:创建示例流作业

  1. 在左侧导航栏,单击数据开发 > ETL > 新建 > 空白流作业草稿

  2. recommendationsimpressionsclicks三张以user_id为共享主键的表数据分别插入以user_id为主键的大宽表user_rec_wide中。

    BEGIN STATEMENT SET
    ;
    
    INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, rec_score)
    SELECT
        user_id
        ,item_id
        ,rec_score
    FROM `fluss-demo`.fluss.recommendations
    ;
    
    INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, imp_cnt)
    SELECT
        user_id
        ,item_id
        ,imp_cnt
    FROM `fluss-demo`.fluss.impressions
    ;
    
    -- Apply click counts
    
    INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, click_cnt)
    SELECT
        user_id
        ,item_id
        ,click_cnt
    FROM `fluss-demo`.fluss.clicks
    ;
    
    
    END
    ;
  3. 单击右上角单击部署 > 前往运维。单击启动对应作业。

    image

    可以注意到计算拓扑中并不包含多流 Join节点,多张明细表均独立写入宽表。

步骤三:插入明细表数据

数据查询页面,再次新建一个空白查询脚本,输入以下SQL,向三张明细表分别插入测试数据:

-- Recommendations – model scores
INSERT INTO `fluss-demo`.fluss.recommendations VALUES
    ('user_101','prod_501',0.92 , TIMESTAMP '2025-05-16 09:15:02'),
    ('user_101','prod_502',0.78 , TIMESTAMP '2025-05-16 09:15:05'),
    ('user_102','prod_503',0.83 , TIMESTAMP '2025-05-16 09:16:00'),
    ('user_103','prod_501',0.67 , TIMESTAMP '2025-05-16 09:16:20'),
    ('user_104','prod_504',0.88 , TIMESTAMP '2025-05-16 09:16:45');
-- Impressions – how often each (user,item) was shown
INSERT INTO `fluss-demo`.fluss.impressions VALUES
    ('user_101','prod_501', 3, TIMESTAMP '2025-05-16 09:17:10'),
    ('user_101','prod_502', 1, TIMESTAMP '2025-05-16 09:17:15'),
    ('user_102','prod_503', 7, TIMESTAMP '2025-05-16 09:18:22'),
    ('user_103','prod_501', 4, TIMESTAMP '2025-05-16 09:18:30'),
    ('user_104','prod_504', 2, TIMESTAMP '2025-05-16 09:18:55');
-- Clicks – user engagement
INSERT INTO `fluss-demo`.fluss.clicks VALUES
    ('user_101','prod_501', 1, TIMESTAMP '2025-05-16 09:19:00'),
    ('user_101','prod_502', 2, TIMESTAMP '2025-05-16 09:19:07'),
    ('user_102','prod_503', 1, TIMESTAMP '2025-05-16 09:19:12'),
    ('user_103','prod_501', 1, TIMESTAMP '2025-05-16 09:19:20'),
    ('user_104','prod_504', 1, TIMESTAMP '2025-05-16 09:19:25');

步骤四:验证大宽表数据写入结果

  1. 数据开发 > ETL > 新建 > 空白流作业草稿

    SELECT * FROM `fluss-demo`.fluss.user_rec_wide;
  2. 单击调试按钮。我们将观察到三张明细表分写写入后,大宽表中汇总的流式计算结果:以user_id为主键计算生成了正确的大宽表数据。

    image