Versioned Merge Engine

更新时间:
复制 MD 格式

This topic describes the Versioned Merge Engine.

Versioned Merge Engine overview

  • Merge policy

    Merges records based on version numbers or event timestamps. For each primary key, this engine retains the record with the highest version number or the latest timestamp.

  • Configuration

    To enable this engine, set the 'table.merge-engine' = 'versioned' table property when you create the table.

  • Update mechanism

    • An update occurs only if the value of the specified field, such as the version number, in a new record is greater than or equal to the existing value.

    • If the new value is less than the existing value or is null, the engine does not trigger an update.

  • Scenarios

    • Deduplicating data and merging out-of-order data. This engine is an effective alternative to the deduplicate transform in stream processing.

    • Ensuring eventual consistency with input data. This means the system automatically resolves data discrepancies within a time window to reach a consistent state.

  • Limits

    • UPDATE and DELETE operations are not supported.

    • Partial updates are not supported.

    • UPDATE_BEFORE and DELETE events in the change log are automatically ignored.

Examples

Version column

The version column stores the version number or event timestamp of a data record. This column is used to select the latest version of a record and is required to enable the Versioned Merge Engine.

'table.merge-engine' = 'versioned',
'table.merge-engine.versioned.ver-column' = '<column_name>'

The version column supports the following data types:

  • INT

  • BIGINT

  • TIMESTAMP

  • TIMESTAMP(p) (with precision)

  • TIMESTAMP_LTZ (local time zone timestamp)

  • TIMESTAMP_LTZ(p) (local time zone timestamp with precision)

SQL example

CREATE TABLE VERSIONED (
    a INT NOT NULL PRIMARY KEY NOT ENFORCED,
    b STRING, 
    ts BIGINT
 ) WITH (
    'table.merge-engine' = 'versioned',
    'table.merge-engine.versioned.ver-column' = 'ts'
);

INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v1', 1000);

-- Insert data with ts < 1000. The record is not updated.
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v2', 999);
SELECT * FROM VERSIONED WHERE a = 1;
-- Output
-- +---+-----+------+
-- | a | b   | ts   |
-- +---+-----+------+
-- | 1 | v1  | 1000 |
-- +---+-----+------+


-- Insert data with ts > 1000. The record is updated.
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v3', 2000);
SELECT * FROM VERSIONED WHERE a = 1;
-- Output
-- +---+-----+------+
-- | a | b   | ts   |
-- +---+-----+------+
-- | 1 | v3  | 2000 |
-- +---+-----+------+

-- Insert data with a null ts. The record is not updated.
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v4', CAST(null as BIGINT));
SELECT * FROM VERSIONED WHERE a = 1;
-- Output
-- +---+-----+------+
-- | a | b   | ts   |
-- +---+-----+------+
-- | 1 | v3  | 2000 |
-- +---+-----+------+