This document explains how to use the aggregation merge engine.
Aggregation merge engine
-
Merge Policy
The engine groups rows by the same primary key and applies a specified aggregation function to each non-primary key field.
-
Configuration
To enable this feature, set the table property
'table.merge-engine' = 'aggregation'when you create a table. -
Usage
-
When you create a table in the console, select the merge policy and use the advanced configuration
'fields.<field_name>.agg' = '<function_name>'to specify an aggregation function for each non-primary key field. -
For aggregation functions that require additional parameters, such as a custom delimiter for
listagg, use the following format:'fields.<field_name>.agg' = '<function_name>', 'fields.<field_name>.<function_name>.<parameter_name>' = '<parameter_value>'If you do not specify an aggregation function for a field, the system uses
last_value_ignore_nullsas the default aggregation behavior.
-
-
Use Cases
-
Calculating cumulative totals and statistical metrics
-
Maintaining counters and measures
-
Tracking changes in maximum or minimum values
-
Building real-time dashboards and analytics
-
-
Limitations
-
Retraction semantics are not supported. The engine uses one-way aggregation logic and cannot reverse an aggregation. For example, it cannot subtract a deleted value from a
sumresult or undo amaxoperation to restore the second-largest value. -
The engine provides limited support for delete operations. By default, the engine silently ignores delete operations ('table.delete.behavior' = 'ignore').
-
You can configure the engine to either reject delete operations and return an error (disable) or allow the physical removal of the entire row (allow).
'table.delete.behavior' = 'ignore' -- Default value. Silently ignores delete operations. 'table.delete.behavior' = 'disable' -- Rejects delete operations and returns an error. 'table.delete.behavior' = 'allow' -- Allows delete operations.
-
Example
CREATE TABLE product_stats (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
last_update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.price.agg' = 'max',
'fields.sales.agg' = 'sum'
-- No aggregation function is specified for last_update_time, so last_value_ignore_nulls is used by default.
);
-- Insert data. Records with the same primary key are automatically aggregated.
INSERT INTO product_stats VALUES
(1, 23.0, 15, TIMESTAMP '2024-01-01 10:00:00'),
(1, 30.2, 20, TIMESTAMP '2024-01-01 11:00:00');
SELECT * FROM product_stats WHERE product_id = 1;
-- Output:
-- +------------+-------+-------+---------------------+
-- | product_id | price | sales | last_update_time |
-- +------------+-------+-------+---------------------+
-- | 1 | 30.2 | 35 | 2024-01-01 11:00:00 |
-- +------------+-------+-------+---------------------+
-- price: 30.2 (the maximum of 23.0 and 30.2)
-- sales: 35 (the sum of 15 and 20)
-- last_update_time: 2024-01-01 11:00:00 (the last non-null value)
Supported aggregation functions
sum
Calculates the sum of values across multiple rows.
-
Supported Types:
TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,DECIMAL. -
Null Handling: Ignores null values.
CREATE TABLE test_sum (
id BIGINT,
amount DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.amount.agg' = 'sum'
);
INSERT INTO test_sum VALUES (1, 100.50), (1, 200.75);
SELECT * FROM test_sum WHERE id = 1;
-- Output:
-- +----+---------+
-- | id | amount |
-- +----+---------+
-- | 1 | 301.25 |
-- +----+---------+
product
Calculates the product of values across multiple rows.
-
Supported Types:
TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,DECIMAL. -
Null Handling: Ignores null values.
CREATE TABLE test_product (
id BIGINT,
discount_factor DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.discount_factor.agg' = 'product'
);
INSERT INTO test_product VALUES (1, 0.9), (1, 0.8);
SELECT * FROM test_product WHERE id = 1;
-- Output:
-- +----+-----------------------+
-- | id | discount_factor |
-- +----+-----------------------+
-- | 1 | 0.7200000000000001 |
-- +----+-----------------------+
-- Note: Due to IEEE 754 double-precision floating-point arithmetic, the result is 0.7200000000000001 instead of 0.72.
-- For precise calculations, we recommend using the DECIMAL data type instead of DOUBLE.
max
Retains the maximum value.
-
Supported Types:
CHAR,STRING,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DATE,TIME,TIMESTAMP,TIMESTAMP_LTZ. -
Null Handling: Ignores null values.
CREATE TABLE test_max (
id BIGINT,
temperature DOUBLE,
reading_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.temperature.agg' = 'max',
'fields.reading_time.agg' = 'max'
);
INSERT INTO test_max VALUES
(1, 25.5, TIMESTAMP '2024-01-01 10:00:00'),
(1, 28.3, TIMESTAMP '2024-01-01 11:00:00');
SELECT * FROM test_max WHERE id = 1;
-- Output:
-- +----+-------------+---------------------+
-- | id | temperature | reading_time |
-- +----+-------------+---------------------+
-- | 1 | 28.3 | 2024-01-01 11:00:00 |
-- +----+-------------+---------------------+
min
Retains the minimum value.
-
Supported Types:
CHAR,STRING,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DATE,TIME,TIMESTAMP,TIMESTAMP_LTZ. -
Null Handling: Ignores null values.
CREATE TABLE test_min (
id BIGINT,
lowest_price DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.lowest_price.agg' = 'min'
);
INSERT INTO test_min VALUES (1, 99.99), (1, 79.99), (1, 89.99);
SELECT * FROM test_min WHERE id = 1;
-- Output:
-- +----+--------------+
-- | id | lowest_price |
-- +----+--------------+
-- | 1 | 79.99 |
-- +----+--------------+
last_value
Overwrites the previous value with the most recent value.
-
Supported Types: All data types.
-
Null Handling: A null value overwrites the previous value.
CREATE TABLE test_last_value (
id BIGINT,
status STRING,
last_login TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.status.agg' = 'last_value',
'fields.last_login.agg' = 'last_value'
);
INSERT INTO test_last_value VALUES (1, 'online', TIMESTAMP '2024-01-01 10:00:00');
INSERT INTO test_last_value VALUES (1, 'offline', TIMESTAMP '2024-01-01 11:00:00');
INSERT INTO test_last_value VALUES (1, CAST(NULL AS STRING), TIMESTAMP '2024-01-01 12:00:00');
-- A null value overwrites the previous 'offline' value.
SELECT * FROM test_last_value WHERE id = 1;
-- Output:
-- +----+--------+---------------------+
-- | id | status | last_login |
-- +----+--------+---------------------+
-- | 1 | null | 2024-01-01 12:00:00 |
-- +----+--------+---------------------+
last_value_ignore_nulls
Replaces the previous value with the most recent non-null value. This is the default aggregation behavior when no aggregation function is specified.
-
Supported Types: All data types.
-
Null Handling: Ignores null values and retains the previous value.
CREATE TABLE test_last_value_ignore_nulls (
id BIGINT,
email STRING,
phone STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.email.agg' = 'last_value_ignore_nulls',
'fields.phone.agg' = 'last_value_ignore_nulls'
);
INSERT INTO test_last_value_ignore_nulls VALUES (1, 'user@example.com', '123-456');
INSERT INTO test_last_value_ignore_nulls VALUES (1, CAST(NULL AS STRING), '789-012');
-- The null value for email is ignored, and the previous value is retained.
INSERT INTO test_last_value_ignore_nulls VALUES (1, 'new@example.com', CAST(NULL AS STRING));
-- The null value for phone is ignored, and the previous value is retained.
SELECT * FROM test_last_value_ignore_nulls WHERE id = 1;
-- Output:
-- +----+-----------------+---------+
-- | id | email | phone |
-- +----+-----------------+---------+
-- | 1 | new@example.com | 789-012 |
-- +----+-----------------+---------+
first_value
Retains the first value of a field and ignores all subsequent values.
-
Supported Types: All data types.
-
Null Handling: If the first value is null, the null value is retained.
CREATE TABLE test_first_value (
id BIGINT,
first_purchase_date DATE,
first_product STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.first_purchase_date.agg' = 'first_value',
'fields.first_product.agg' = 'first_value'
);
INSERT INTO test_first_value VALUES (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB');
-- The second record is ignored, and the first value is retained.
SELECT * FROM test_first_value WHERE id = 1;
-- Output:
-- +----+---------------------+---------------+
-- | id | first_purchase_date | first_product |
-- +----+---------------------+---------------+
-- | 1 | 2024-01-01 | ProductA |
-- +----+---------------------+---------------+
listagg
Concatenates multiple string values into a single string, separated by a delimiter.
-
Supported Types:
STRING,CHAR. -
Null Handling: Skips null values.
-
Delimiter: The default delimiter is a comma (
,), but you can specify a custom delimiter as a parameter.
CREATE TABLE test_listagg (
id BIGINT,
tags1 STRING,
tags2 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.tags1.agg' = 'listagg',
'fields.tags2.agg' = 'listagg',
'fields.tags2.listagg.delimiter' = ';' -- Specify a custom delimiter.
);
INSERT INTO test_listagg VALUES (1, 'developer', 'developer'), (1, 'java', 'java'), (1, 'flink', 'flink');
SELECT * FROM test_listagg WHERE id = 1;
-- Output:
-- +----+----------------------+----------------------+
-- | id | tags1 | tags2 |
-- +----+----------------------+----------------------+
-- | 1 | developer,java,flink | developer;java;flink |
-- +----+----------------------+----------------------+
string_agg
An alias for listagg. It concatenates multiple string values into a single string, separated by a delimiter.
-
Supported Types:
STRING,CHAR. -
Null Handling: Skips null values.
-
Delimiter: The default delimiter is a comma (
,), but you can specify a custom delimiter as a parameter.
CREATE TABLE test_string_agg (
id BIGINT,
tags STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.tags.agg' = 'string_agg',
'fields.tags.string_agg.delimiter' = ';' -- Specify a custom delimiter.
);
INSERT INTO test_string_agg VALUES (1, 'developer'), (1, 'java'), (1, 'flink');
SELECT * FROM test_string_agg WHERE id = 1;
-- Output:
-- +----+----------------------+
-- | id | tags |
-- +----+----------------------+
-- | 1 | developer;java;flink |
-- +----+----------------------+
rbm32
Performs union aggregation on serialized 32-bit RoaringBitmap values.
-
Supported Types:
BYTES. -
Null Handling: Ignores null values.
-
Note: RoaringBitmap values must be serialized on the client in the standard format of the RoaringBitmap library before being ingested.
CREATE TABLE user_visits (
user_id BIGINT,
visit_bitmap BYTES,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.visit_bitmap.agg' = 'rbm32'
);
-- Insert serialized RoaringBitmap values (hexadecimal literals).
-- Bitmap {1,2}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000001000200');
-- Bitmap {2,3}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000002000300');
SELECT * FROM user_visits WHERE user_id = 1;
-- Result: visit_bitmap contains the union {1,2,3}.
rbm64
Performs union aggregation on serialized 64-bit RoaringBitmap values.
-
Supported Types:
BYTES. -
Null Handling: Ignores null values.
-
Note: Roaring64Bitmap values must be serialized on the client in the 64-bit format of the RoaringBitmap library before being ingested.
CREATE TABLE session_interactions (
session_id BIGINT,
interaction_bitmap BYTES,
PRIMARY KEY (session_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.interaction_bitmap.agg' = 'rbm64'
);
-- Insert serialized Roaring64Bitmap values.
-- Result: interaction_bitmap contains the union of all inserted bitmaps.
bool_and
Performs logical AND aggregation on boolean values. It returns true only if all values are true.
-
Supported Types:
BOOLEAN. -
Null Handling: Ignores null values.
CREATE TABLE test_bool_and (
id BIGINT,
has_all_permissions BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.has_all_permissions.agg' = 'bool_and'
);
INSERT INTO test_bool_and VALUES (1, true), (1, true), (1, false);
SELECT * FROM test_bool_and WHERE id = 1;
-- Output:
-- +----+---------------------+
-- | id | has_all_permissions |
-- +----+---------------------+
-- | 1 | false |
-- +----+---------------------+
bool_or
Performs logical OR aggregation on boolean values. It returns true if at least one value is true.
-
Supported Types:
BOOLEAN. -
Null Handling: Ignores null values.
CREATE TABLE test_bool_or (
id BIGINT,
has_any_alert BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.has_any_alert.agg' = 'bool_or'
);
INSERT INTO test_bool_or VALUES (1, false), (1, false), (1, true);
SELECT * FROM test_bool_or WHERE id = 1;
-- Output:
-- +----+---------------+
-- | id | has_any_alert |
-- +----+---------------+
-- | 1 | true |
-- +----+---------------+