This topic describes the Versioned Merge Engine.
Versioned Merge Engine overview
Merge policy
Merges records based on version numbers or event timestamps. For each primary key, this engine retains the record with the highest version number or the latest timestamp.
Configuration
To enable this engine, set the
'table.merge-engine' = 'versioned'table property when you create the table.Update mechanism
An update occurs only if the value of the specified field, such as the version number, in a new record is greater than or equal to the existing value.
If the new value is less than the existing value or is null, the engine does not trigger an update.
Scenarios
Deduplicating data and merging out-of-order data. This engine is an effective alternative to the deduplicate transform in stream processing.
Ensuring eventual consistency with input data. This means the system automatically resolves data discrepancies within a time window to reach a consistent state.
Limits
UPDATE and DELETE operations are not supported.
Partial updates are not supported.
UPDATE_BEFORE and DELETE events in the change log are automatically ignored.
Examples
Version column
The version column stores the version number or event timestamp of a data record. This column is used to select the latest version of a record and is required to enable the Versioned Merge Engine.
'table.merge-engine' = 'versioned',
'table.merge-engine.versioned.ver-column' = '<column_name>'The version column supports the following data types:
INTBIGINTTIMESTAMPTIMESTAMP(p)(with precision)TIMESTAMP_LTZ(local time zone timestamp)TIMESTAMP_LTZ(p)(local time zone timestamp with precision)
SQL example
CREATE TABLE VERSIONED (
a INT NOT NULL PRIMARY KEY NOT ENFORCED,
b STRING,
ts BIGINT
) WITH (
'table.merge-engine' = 'versioned',
'table.merge-engine.versioned.ver-column' = 'ts'
);
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v1', 1000);
-- Insert data with ts < 1000. The record is not updated.
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v2', 999);
SELECT * FROM VERSIONED WHERE a = 1;
-- Output
-- +---+-----+------+
-- | a | b | ts |
-- +---+-----+------+
-- | 1 | v1 | 1000 |
-- +---+-----+------+
-- Insert data with ts > 1000. The record is updated.
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v3', 2000);
SELECT * FROM VERSIONED WHERE a = 1;
-- Output
-- +---+-----+------+
-- | a | b | ts |
-- +---+-----+------+
-- | 1 | v3 | 2000 |
-- +---+-----+------+
-- Insert data with a null ts. The record is not updated.
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v4', CAST(null as BIGINT));
SELECT * FROM VERSIONED WHERE a = 1;
-- Output
-- +---+-----+------+
-- | a | b | ts |
-- +---+-----+------+
-- | 1 | v3 | 2000 |
-- +---+-----+------+