Fluss + Flink构建秒级实时湖仓

更新时间:
复制为 MD 格式

本文为您介绍如何基于Flink + Fluss构建实时湖仓。

从“消息管道”到“流式分析存储”

随着业务对数据时效性的要求提高,企业数据处理模式正在从 T+1 离线批处理向秒级实时计算转变。在搜索推荐、广告归因、金融风控及 IoT 监控等核心场景中,业务决策窗口已压缩至毫秒级。这种即时性需求对底层数据基础设施提出了严格要求。

传统架构的结构性瓶颈

业界通用的 Flink 结合 Kafka 架构在构建实时数仓中间层时面临挑战。Kafka 本质是面向消息传递的行式日志系统,而非为分析型负载设计。由于“消息语义”与“分析语义”的错配,该架构在实际生产中暴露了不支持更新不可直接查询回溯成本高昂网络资源浪费等核心痛点。

面向分析的流式存储

为解决上述问题,存储系统需向“面向分析的流式存储”演进。新型架构必须同时满足两类核心诉求:

  • 流式特性:保持高吞吐、低延迟的数据写入能力。

  • 分析特性:引入列式存储、索引机制和主键更新(Upsert)能力,支持高效的即席查询与数据探查。

这一演进旨在消除消息队列与数据库之间的技术鸿沟,为实时数仓提供真正具备分析能力的数据底座。

前提条件

  • 已经创建Fluss集群,详情请参见开通流存储Fluss

  • 已经开通同一VPC下的Flink项目空间,详情请参见开通实时计算Flink

  • 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Fluss控制台相关权限,详情请参见权限管理

操作步骤

步骤一:创建Fluss Catalog

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据管理 > 创建Catalog

    在弹出的创建 Catalog对话框中,选择内置 Catalog页签,在 Catalog 列表中选择Fluss,然后单击下一步

  4. 配置Catalog相关信息,各配置项说明如下表所示。

    可在Fluss控制台页面右边的集群信息中获取相关信息。

    配置项

    说明

    是否必填

    备注

    type

    Catalog类型

    固定值为fluss。

    default-database

    Catalog默认数据库名字

    Catalog 默认的数据库, 默认值为fluss。

    bootstrap.servers

    连接Fluss集群的 bootstrapserver地址

    服务地址。

    client.security.protocol

    鉴权框架

    固定值为SASL

    client.security.sasl.mechanism

    鉴权机制

    固定值为PLAIN。

    client.security.sasl.username

    内置用户名

    Fluss实例内置用户名。

    client.security.sasl.password

    内置密码

    Fluss实例内置密码。

步骤二:创建Fluss Sink

  1. 单击刚刚创建的Fluss Catalog展开,单击默认数据库Fluss。

  2. 点击创建表

    创建名为fluss_log_table的日志表。更多日志表详情请参见日志表

    CREATE TABLE `my-catalog`.`fluss`.`fluss_log_table` (
     event_id BIGINT,
      user_id BIGINT,
      amount DECIMAL(10,2),
      event_time TIMESTAMP(3),
      ip STRING
    ) WITH (
      'bucket.num' = '10'
    );

    创建名为fluss_pk_table的主键表。更多主键表详情请参见主键表

    CREATE TABLE `my-catalog`.`fluss`.`fluss_pk_table` (
      user_id BIGINT,
      last_amount DECIMAL(10,2),
      last_event_time TIMESTAMP(3),
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'bucket.num' = '10'
    );

步骤三:创建Flink Source

利用 Flink Datagen 连接器构建高并发业务日志流,验证 Fluss 主键表的数据更新机制。

数据生成逻辑包含重复的user_id,用于模拟生产环境中针对同一用户的连续状态变更与更新事件。

此外,通过配置 rows-per-second 参数控制数据生成速率,可灵活调整负载强度,从而评估系统在不同吞吐压力下的性能指标与稳定性。

  1. 在左侧导航栏,单击数据开发 > ETL

  2. 单击image后,单击新建流作业

    CREATE TEMPORARY TABLE fluss_datagen_source (
      event_id BIGINT,
      user_id BIGINT,          -- 会重复,用于测试主键更新
      amount DECIMAL(10,2),
      event_time TIMESTAMP(3), -- 使用处理时间模拟事件时间
      ip STRING
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      -- event_id 使用序列生成器
      'fields.event_id.kind' = 'random',
      'fields.event_id.min' = '1',          
      'fields.event_id.max' = '9999',  -- BIGINT max value
      -- user_id 随机(用于制造重复)
      'fields.user_id.kind' = 'random', -- 引入重复 user_id,用于主键更新测试
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '10000', 
      -- amount 随机
      'fields.amount.kind' = 'random',
      'fields.amount.min' = '0.01',
      'fields.amount.max' = '999.99',
      -- time
      'fields.event_time.kind' = 'random',
      -- ip 长度
      'fields.ip.kind' = 'random',
      'fields.ip.length' = '15'
    );
    EXECUTE STATEMENT SET
    BEGIN
    INSERT INTO `my-catalog`.`fluss`.`fluss_log_table`
      SELECT * FROM fluss_datagen_source;
      INSERT INTO `my-catalog`.`fluss`.`fluss_pk_table`
      SELECT 
        user_id,
        amount AS last_amount,
        event_time AS last_event_time
      FROM fluss_datagen_source;
    END;
  3. 单击部署后,在弹出的对话框中,单击确认

  4. 单击前往运维,然后在作业运维页面启动该作业。

步骤四:查看数据写入情况

  1. 左侧导航栏单击数据开发 > 数据查询

  2. 单击image后,单击新建临时查询脚本

    --查询日志表
    SELECT * FROM `my-catalog`.`fluss`.`fluss_log_table` LIMIT 1000;

    执行查询后,返回 fluss_log_table 的数据,包含 event_iduser_idamountevent_timeip 列。

    --查询主键表
    SELECT * FROM `my-catalog`.`fluss`.`fluss_pk_table` LIMIT 1000;

    执行查询后,结果表格返回 user_idlast_amountlast_event_time 三列数据,展示用户最近一次交易金额与时间。

  3. 可以查看Flink作业的运行状态,对比Fluss日志表和主键表的不同。Merge Engines

    作业运行图的监控数据显示,fluss_pk_table 与 fluss_log_table 虽然接收了相同数量的记录(2670 条),但在数据体量上存在显著差异:前者接收数据量仅为 107.43 KB,而后者高达 190.87 KB

    这一差异验证了 Fluss 主键表在数据处理机制上的特性。基于主键的 Upsert 语义,系统对流入数据执行了合并与压缩操作。与日志表完整保留所有历史追加记录(Append-only)不同,主键表通过 Compaction 机制仅保留特定主键的最新状态值。在确保数据最终一致性的同时,显著降低了存储占用。