本文将为您介绍Partial Update的使用。
为什么需要Partial Update实现大宽表拼接
在传统流式数据管道中,为构建宽表常需在主键上 Join 多个流或表。例如,在实时推荐系统中,为了获得用户 360° 视图,往往需要整合来自偏好、购买、点击、购物车等多个不同数据源的更新。
虽然 Apache Flink 可实现多流 Join,但在大规模场景下存在多个痛点:
状态规模庞大 :需缓存所有事件直到完成 Join,状态可能长期甚至无限保留;
性能瓶颈 :检查点开销大、易引发反压,影响整体吞吐和稳定性;
调试困难 :状态复杂且不可见,问题排查成本高;
数据一致性差 :TTL 设置可能导致事件提前被丢弃,造成结果错误。
因此,我们迫切的需要一种新的数据打宽方法,避免复杂 Join,并显著降低资源消耗和维护复杂度。
基于Fluss Partial Update的数据打宽新模式
Fluss 引入基于Partial Update(部分更新)机制的数据打宽方法,支持在流处理作业中直接扩展维度信息,无需依赖复杂的多流 Join。
与传统方式不同,Fluss允许各数据源根据主键独立更新自己相关的字段到同一张宽表中。例如,你可以定义一张以user_id
为主键的user_profile
表,包含所有可能字段。每个数据流只需写入自己知道的部分字段,Fluss 存储引擎会自动按主键合并这些更新。
其底层机制是:当某条部分更新到达时,Fluss 会查找该主键已有的记录,并仅更新本次提供的字段,其余字段保持不变。合并后的新版本实时落盘,确保每条记录始终是最新的完整状态,其底层机制类似于数据库中的“增量更新”,每次只更新有变化的字段,而非整条记录。这种方式大幅降低了 Flink 作业的状态压力和计算复杂度,使数据管道更轻量、高效、易维护。
实践示例
前提条件
步骤一:创建表
已创建名为fluss-demo
的 Fluss Catalog。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击
在默认的fluss
数据库中创建三张表代表构建宽表的不同数据源,及一张宽表作为结果表
-- Recommendations – model scores
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.recommendations (
user_id STRING,
item_id STRING,
rec_score DOUBLE,
rec_ts TIMESTAMP(3),
PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- Impressions – how often we showed something
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.impressions (
user_id STRING,
item_id STRING,
imp_cnt INT,
imp_ts TIMESTAMP(3),
PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- Clicks – user engagement
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.clicks (
user_id STRING,
item_id STRING,
click_cnt INT,
clk_ts TIMESTAMP(3),
PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- Result wide table
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.user_rec_wide (
user_id STRING,
item_id STRING,
rec_score DOUBLE, -- updated by recs stream
imp_cnt INT, -- updated by impressions stream
click_cnt INT, -- updated by clicks stream
PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
点击运行完成示例表创建:
步骤二:创建示例流作业
在左侧导航栏,单击
将
recommendations
、impressions
及clicks
三张以user_id
为共享主键的表数据分别插入以user_id
为主键的大宽表user_rec_wide
中。BEGIN STATEMENT SET ; INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, rec_score) SELECT user_id ,item_id ,rec_score FROM `fluss-demo`.fluss.recommendations ; INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, imp_cnt) SELECT user_id ,item_id ,imp_cnt FROM `fluss-demo`.fluss.impressions ; -- Apply click counts INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, click_cnt) SELECT user_id ,item_id ,click_cnt FROM `fluss-demo`.fluss.clicks ; END ;
单击右上角单击
。单击启动对应作业。可以注意到计算拓扑中并不包含多流 Join节点,多张明细表均独立写入宽表。
步骤三:插入明细表数据
在数据查询页面,再次新建一个空白查询脚本,输入以下SQL,向三张明细表分别插入测试数据:
-- Recommendations – model scores
INSERT INTO `fluss-demo`.fluss.recommendations VALUES
('user_101','prod_501',0.92 , TIMESTAMP '2025-05-16 09:15:02'),
('user_101','prod_502',0.78 , TIMESTAMP '2025-05-16 09:15:05'),
('user_102','prod_503',0.83 , TIMESTAMP '2025-05-16 09:16:00'),
('user_103','prod_501',0.67 , TIMESTAMP '2025-05-16 09:16:20'),
('user_104','prod_504',0.88 , TIMESTAMP '2025-05-16 09:16:45');
-- Impressions – how often each (user,item) was shown
INSERT INTO `fluss-demo`.fluss.impressions VALUES
('user_101','prod_501', 3, TIMESTAMP '2025-05-16 09:17:10'),
('user_101','prod_502', 1, TIMESTAMP '2025-05-16 09:17:15'),
('user_102','prod_503', 7, TIMESTAMP '2025-05-16 09:18:22'),
('user_103','prod_501', 4, TIMESTAMP '2025-05-16 09:18:30'),
('user_104','prod_504', 2, TIMESTAMP '2025-05-16 09:18:55');
-- Clicks – user engagement
INSERT INTO `fluss-demo`.fluss.clicks VALUES
('user_101','prod_501', 1, TIMESTAMP '2025-05-16 09:19:00'),
('user_101','prod_502', 2, TIMESTAMP '2025-05-16 09:19:07'),
('user_102','prod_503', 1, TIMESTAMP '2025-05-16 09:19:12'),
('user_103','prod_501', 1, TIMESTAMP '2025-05-16 09:19:20'),
('user_104','prod_504', 1, TIMESTAMP '2025-05-16 09:19:25');
步骤四:验证大宽表数据写入结果
在
。SELECT * FROM `fluss-demo`.fluss.user_rec_wide;
单击调试按钮。我们将观察到三张明细表分写写入后,大宽表中汇总的流式计算结果:以
user_id
为主键计算生成了正确的大宽表数据。