Partial update (building wide tables)

更新时间:
复制 MD 格式

This topic describes how to use the partial update feature.

Why use partial update to build wide tables

In traditional stream data pipelines, building a wide table often requires joining multiple streams or tables on a primary key. For example, a real-time recommendation system needs to consolidate updates from various sources, such as user preferences, purchases, clicks, and shopping carts, to create a 360-degree customer view.

Although Apache Flink can perform a multi-stream join, this approach has several pain points in large-scale scenarios:

  • Large state size: All events must be cached until the join is complete, which can cause the state to grow indefinitely.

  • Performance bottlenecks: Large checkpoint overhead and frequent backpressure can reduce overall throughput and stability.

  • Difficult debugging: Complex and opaque state makes troubleshooting time-consuming and costly.

  • Poor data consistency: Setting a Time-to-Live (TTL) can cause events to be discarded prematurely, leading to incorrect results.

These challenges highlight the need for a new method to build wide tables that avoids complex joins and significantly reduces resource consumption and maintenance overhead.

Building wide tables with Fluss partial update

Fluss introduces a method for building wide tables that uses a partial update mechanism. This allows you to enrich data with dimensional information directly within a stream processing job, eliminating the need for a complex multi-stream join.

Unlike traditional methods, Fluss allows each data source to independently update its own set of columns in a single wide table based on a primary key. For example, you can define a user_profile table with user_id as the primary key, containing all possible columns. Each data stream only needs to write the partial data it has, and the Fluss storage engine automatically merges these updates by primary key.

image

The underlying mechanism works as follows: when a partial update arrives, Fluss finds the existing record for that primary key and updates only the columns provided in the current event, leaving other columns unchanged. The merged new version is written to disk in real time, ensuring that each record is always in its latest, complete state. This process is similar to an incremental update in a database, where only changed fields are modified instead of the entire record. This approach dramatically reduces the state burden and computational complexity of the Flink job, making the data pipeline more lightweight, efficient, and easier to maintain.

Example

Prerequisites

Step 1: Create the tables

A Fluss Catalog named fluss-demo has been created.

  1. Log in to the Realtime Compute for Apache Flink console.

  2. In the Actions column of the target workspace, click Console.

  3. In the left-side navigation pane, choose Data Development > SQL Editor > New Temporary Query Script.

In the default fluss database, create three tables to represent the different data sources for building a wide table, and one wide table to serve as the result table.

-- Recommendations – model scores
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.recommendations (
    user_id  STRING,
    item_id  STRING,
    rec_score DOUBLE,
    rec_ts   TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- Impressions – how often we showed something
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.impressions (
    user_id STRING,
    item_id STRING,
    imp_cnt INT,
    imp_ts  TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- Clicks – user engagement
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.clicks (
    user_id  STRING,
    item_id  STRING,
    click_cnt INT,
    clk_ts    TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- Result wide table
CREATE TABLE IF NOT EXISTS `fluss-demo`.fluss.user_rec_wide (
    user_id   STRING,
    item_id   STRING,
    rec_score DOUBLE,   -- updated by recs stream
    imp_cnt   INT,      -- updated by impressions stream
    click_cnt INT,      -- updated by clicks stream
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');

Click Run to create the tables.

Step 2: Create the stream job

  1. In the left-side navigation pane, choose Data Development > ETL > New > Blank Streaming Job Draft.

  2. Insert data from the recommendations, impressions, and clicks tables, which share user_id as the primary key, into the large wide table user_rec_wide that uses user_id as its primary key.

    BEGIN STATEMENT SET
    ;
    INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, rec_score)
    SELECT
        user_id
        ,item_id
        ,rec_score
    FROM `fluss-demo`.fluss.recommendations
    ;
    INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, imp_cnt)
    SELECT
        user_id
        ,item_id
        ,imp_cnt
    FROM `fluss-demo`.fluss.impressions
    ;
    -- Apply click counts
    INSERT INTO `fluss-demo`.fluss.user_rec_wide (user_id, item_id, click_cnt)
    SELECT
        user_id
        ,item_id
        ,click_cnt
    FROM `fluss-demo`.fluss.clicks
    ;
    END
    ;
  3. In the upper-right corner, click Deploy > Go to O&M. Click Start to run the job.

    After the job starts, the Status page appears. The Health Score is 100, and the Historical Restart Count is 0. The DAG topology shows that the source tables are fluss-demo.fluss.recommendations, fluss-demo.fluss.impressions, and fluss-demo.fluss.clicks, and the target wide table is user_rec_wide.

    Note that the computation topology does not contain a multi-stream join node. Each source table independently writes to the wide table.

Step 3: Insert source data

On the SQL Editor page, create another blank query script and run the following SQL statements to insert test data into the three source tables:

-- Recommendations – model scores
INSERT INTO `fluss-demo`.fluss.recommendations VALUES
    ('user_101','prod_501',0.92 , TIMESTAMP '2025-05-16 09:15:02'),
    ('user_101','prod_502',0.78 , TIMESTAMP '2025-05-16 09:15:05'),
    ('user_102','prod_503',0.83 , TIMESTAMP '2025-05-16 09:16:00'),
    ('user_103','prod_501',0.67 , TIMESTAMP '2025-05-16 09:16:20'),
    ('user_104','prod_504',0.88 , TIMESTAMP '2025-05-16 09:16:45');
-- Impressions – how often each (user,item) was shown
INSERT INTO `fluss-demo`.fluss.impressions VALUES
    ('user_101','prod_501', 3, TIMESTAMP '2025-05-16 09:17:10'),
    ('user_101','prod_502', 1, TIMESTAMP '2025-05-16 09:17:15'),
    ('user_102','prod_503', 7, TIMESTAMP '2025-05-16 09:18:22'),
    ('user_103','prod_501', 4, TIMESTAMP '2025-05-16 09:18:30'),
    ('user_104','prod_504', 2, TIMESTAMP '2025-05-16 09:18:55');
-- Clicks – user engagement
INSERT INTO `fluss-demo`.fluss.clicks VALUES
    ('user_101','prod_501', 1, TIMESTAMP '2025-05-16 09:19:00'),
    ('user_101','prod_502', 2, TIMESTAMP '2025-05-16 09:19:07'),
    ('user_102','prod_503', 1, TIMESTAMP '2025-05-16 09:19:12'),
    ('user_103','prod_501', 1, TIMESTAMP '2025-05-16 09:19:20'),
    ('user_104','prod_504', 1, TIMESTAMP '2025-05-16 09:19:25');

Step 4: Verify the results

  1. Choose Data Development > ETL > New > Blank Streaming Job Draft.

    SELECT * FROM `fluss-demo`.fluss.user_rec_wide;
  2. Click the Debug button. You will observe that after the three detail tables are written to separately, the system aggregates the stream computing results and generates the correct data for the large wide table with user_id as the primary key.

    Querying the user_rec_wide wide table returns five records with the user_id, item_id, rec_score, imp_cnt, and click_cnt columns, which confirms that the data from the three source tables has been correctly merged.