本文为您介绍如何基于Flink + Fluss构建实时湖仓。
从“消息管道”到“流式分析存储”
随着业务对数据时效性的要求提高,企业数据处理模式正在从 T+1 离线批处理向秒级实时计算转变。在搜索推荐、广告归因、金融风控及 IoT 监控等核心场景中,业务决策窗口已压缩至毫秒级。这种即时性需求对底层数据基础设施提出了严格要求。
传统架构的结构性瓶颈
业界通用的 Flink 结合 Kafka 架构在构建实时数仓中间层时面临挑战。Kafka 本质是面向消息传递的行式日志系统,而非为分析型负载设计。由于“消息语义”与“分析语义”的错配,该架构在实际生产中暴露了不支持更新、不可直接查询、回溯成本高昂及网络资源浪费等核心痛点。
面向分析的流式存储
为解决上述问题,存储系统需向“面向分析的流式存储”演进。新型架构必须同时满足两类核心诉求:
-
流式特性:保持高吞吐、低延迟的数据写入能力。
-
分析特性:引入列式存储、索引机制和主键更新(Upsert)能力,支持高效的即席查询与数据探查。
这一演进旨在消除消息队列与数据库之间的技术鸿沟,为实时数仓提供真正具备分析能力的数据底座。
前提条件
-
已经创建Fluss集群,详情请参见开通流存储Fluss。
-
已经开通同一VPC下的Flink项目空间,详情请参见开通实时计算Flink版。
-
如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Fluss控制台相关权限,详情请参见权限管理。
操作步骤
步骤一:创建Fluss Catalog
-
登录实时计算控制台。
-
单击目标工作空间操作列下的控制台。
-
在左侧导航栏,单击
在弹出的创建 Catalog对话框中,选择内置 Catalog页签,在 Catalog 列表中选择Fluss,然后单击下一步。
-
配置Catalog相关信息,各配置项说明如下表所示。
可在Fluss控制台页面右边的集群信息中获取相关信息。
配置项
说明
是否必填
备注
type
Catalog类型
是
固定值为fluss。
default-database
Catalog默认数据库名字
否
Catalog 默认的数据库, 默认值为fluss。
bootstrap.servers
连接Fluss集群的 bootstrapserver地址
是
服务地址。
client.security.protocol
鉴权框架
是
固定值为SASL
client.security.sasl.mechanism
鉴权机制
是
固定值为PLAIN。
client.security.sasl.username
内置用户名
是
Fluss实例内置用户名。
client.security.sasl.password
内置密码
是
Fluss实例内置密码。
步骤二:创建Fluss Sink表
-
单击刚刚创建的Fluss Catalog展开,单击默认数据库Fluss。
-
点击创建表。
创建名为
fluss_log_table的日志表。更多日志表详情请参见日志表。CREATE TABLE `my-catalog`.`fluss`.`fluss_log_table` ( event_id BIGINT, user_id BIGINT, amount DECIMAL(10,2), event_time TIMESTAMP(3), ip STRING ) WITH ( 'bucket.num' = '10' );创建名为
fluss_pk_table的主键表。更多主键表详情请参见主键表。CREATE TABLE `my-catalog`.`fluss`.`fluss_pk_table` ( user_id BIGINT, last_amount DECIMAL(10,2), last_event_time TIMESTAMP(3), PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'bucket.num' = '10' );
步骤三:创建Flink Source
利用 Flink Datagen 连接器构建高并发业务日志流,验证 Fluss 主键表的数据更新机制。
数据生成逻辑包含重复的user_id,用于模拟生产环境中针对同一用户的连续状态变更与更新事件。
此外,通过配置 rows-per-second 参数控制数据生成速率,可灵活调整负载强度,从而评估系统在不同吞吐压力下的性能指标与稳定性。
-
在左侧导航栏,单击。
-
单击
后,单击新建流作业。CREATE TEMPORARY TABLE fluss_datagen_source ( event_id BIGINT, user_id BIGINT, -- 会重复,用于测试主键更新 amount DECIMAL(10,2), event_time TIMESTAMP(3), -- 使用处理时间模拟事件时间 ip STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', -- event_id 使用序列生成器 'fields.event_id.kind' = 'random', 'fields.event_id.min' = '1', 'fields.event_id.max' = '9999', -- BIGINT max value -- user_id 随机(用于制造重复) 'fields.user_id.kind' = 'random', -- 引入重复 user_id,用于主键更新测试 'fields.user_id.min' = '1', 'fields.user_id.max' = '10000', -- amount 随机 'fields.amount.kind' = 'random', 'fields.amount.min' = '0.01', 'fields.amount.max' = '999.99', -- time 'fields.event_time.kind' = 'random', -- ip 长度 'fields.ip.kind' = 'random', 'fields.ip.length' = '15' ); EXECUTE STATEMENT SET BEGIN INSERT INTO `my-catalog`.`fluss`.`fluss_log_table` SELECT * FROM fluss_datagen_source; INSERT INTO `my-catalog`.`fluss`.`fluss_pk_table` SELECT user_id, amount AS last_amount, event_time AS last_event_time FROM fluss_datagen_source; END; -
单击部署后,在弹出的对话框中,单击确认。
-
单击前往运维,然后在作业运维页面启动该作业。
步骤四:查看数据写入情况
-
左侧导航栏单击。
-
单击
后,单击新建临时查询脚本。--查询日志表 SELECT * FROM `my-catalog`.`fluss`.`fluss_log_table` LIMIT 1000;执行查询后,返回
fluss_log_table的数据,包含event_id、user_id、amount、event_time、ip列。--查询主键表 SELECT * FROM `my-catalog`.`fluss`.`fluss_pk_table` LIMIT 1000;执行查询后,结果表格返回 user_id、last_amount、last_event_time 三列数据,展示用户最近一次交易金额与时间。
-
可以查看Flink作业的运行状态,对比Fluss日志表和主键表的不同。Merge Engines
作业运行图的监控数据显示,
fluss_pk_table与fluss_log_table虽然接收了相同数量的记录(2670 条),但在数据体量上存在显著差异:前者接收数据量仅为 107.43 KB,而后者高达 190.87 KB。这一差异验证了 Fluss 主键表在数据处理机制上的特性。基于主键的 Upsert 语义,系统对流入数据执行了合并与压缩操作。与日志表完整保留所有历史追加记录(Append-only)不同,主键表通过 Compaction 机制仅保留特定主键的最新状态值。在确保数据最终一致性的同时,显著降低了存储占用。