Aggregation merge engine

更新时间:
复制 MD 格式

This document explains how to use the aggregation merge engine.

Aggregation merge engine

  • Merge Policy

    The engine groups rows by the same primary key and applies a specified aggregation function to each non-primary key field.

  • Configuration

    To enable this feature, set the table property 'table.merge-engine' = 'aggregation' when you create a table.

  • Usage

    • When you create a table in the console, select the merge policy and use the advanced configuration 'fields.<field_name>.agg' = '<function_name>' to specify an aggregation function for each non-primary key field.

    • For aggregation functions that require additional parameters, such as a custom delimiter for listagg, use the following format:

      'fields.<field_name>.agg' = '<function_name>',
      'fields.<field_name>.<function_name>.<parameter_name>' = '<parameter_value>'
      If you do not specify an aggregation function for a field, the system uses last_value_ignore_nulls as the default aggregation behavior.
  • Use Cases

    • Calculating cumulative totals and statistical metrics

    • Maintaining counters and measures

    • Tracking changes in maximum or minimum values

    • Building real-time dashboards and analytics

  • Limitations

    • Retraction semantics are not supported. The engine uses one-way aggregation logic and cannot reverse an aggregation. For example, it cannot subtract a deleted value from a sum result or undo a max operation to restore the second-largest value.

    • The engine provides limited support for delete operations. By default, the engine silently ignores delete operations ('table.delete.behavior' = 'ignore').

    • You can configure the engine to either reject delete operations and return an error (disable) or allow the physical removal of the entire row (allow).

      'table.delete.behavior' = 'ignore'   -- Default value. Silently ignores delete operations.
      'table.delete.behavior' = 'disable'  -- Rejects delete operations and returns an error.
      'table.delete.behavior' = 'allow'    -- Allows delete operations.

Example

CREATE TABLE product_stats (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT,
    last_update_time TIMESTAMP(3),
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.price.agg' = 'max',
    'fields.sales.agg' = 'sum'
    -- No aggregation function is specified for last_update_time, so last_value_ignore_nulls is used by default.
);

-- Insert data. Records with the same primary key are automatically aggregated.
INSERT INTO product_stats VALUES
    (1, 23.0, 15, TIMESTAMP '2024-01-01 10:00:00'),
    (1, 30.2, 20, TIMESTAMP '2024-01-01 11:00:00');

SELECT * FROM product_stats WHERE product_id = 1;
-- Output:
-- +------------+-------+-------+---------------------+
-- | product_id | price | sales | last_update_time    |
-- +------------+-------+-------+---------------------+
-- |          1 |  30.2 |    35 | 2024-01-01 11:00:00 |
-- +------------+-------+-------+---------------------+
-- price: 30.2 (the maximum of 23.0 and 30.2)
-- sales: 35 (the sum of 15 and 20)
-- last_update_time: 2024-01-01 11:00:00 (the last non-null value)

Supported aggregation functions

sum

Calculates the sum of values across multiple rows.

  • Supported Types: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL.

  • Null Handling: Ignores null values.

CREATE TABLE test_sum (
    id BIGINT,
    amount DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.amount.agg' = 'sum'
);

INSERT INTO test_sum VALUES (1, 100.50), (1, 200.75);
SELECT * FROM test_sum WHERE id = 1;
-- Output:
-- +----+---------+
-- | id | amount  |
-- +----+---------+
-- |  1 | 301.25  |
-- +----+---------+

product

Calculates the product of values across multiple rows.

  • Supported Types: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL.

  • Null Handling: Ignores null values.

CREATE TABLE test_product (
    id BIGINT,
    discount_factor DOUBLE,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.discount_factor.agg' = 'product'
);

INSERT INTO test_product VALUES (1, 0.9), (1, 0.8);
SELECT * FROM test_product WHERE id = 1;
-- Output:
-- +----+-----------------------+
-- | id | discount_factor       |
-- +----+-----------------------+
-- |  1 | 0.7200000000000001    |
-- +----+-----------------------+
-- Note: Due to IEEE 754 double-precision floating-point arithmetic, the result is 0.7200000000000001 instead of 0.72.
-- For precise calculations, we recommend using the DECIMAL data type instead of DOUBLE.

max

Retains the maximum value.

  • Supported Types: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ.

  • Null Handling: Ignores null values.

CREATE TABLE test_max (
    id BIGINT,
    temperature DOUBLE,
    reading_time TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.temperature.agg' = 'max',
    'fields.reading_time.agg' = 'max'
);

INSERT INTO test_max VALUES
    (1, 25.5, TIMESTAMP '2024-01-01 10:00:00'),
    (1, 28.3, TIMESTAMP '2024-01-01 11:00:00');
SELECT * FROM test_max WHERE id = 1;
-- Output:
-- +----+-------------+---------------------+
-- | id | temperature | reading_time        |
-- +----+-------------+---------------------+
-- |  1 |        28.3 | 2024-01-01 11:00:00 |
-- +----+-------------+---------------------+

min

Retains the minimum value.

  • Supported Types: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ.

  • Null Handling: Ignores null values.

CREATE TABLE test_min (
    id BIGINT,
    lowest_price DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.lowest_price.agg' = 'min'
);

INSERT INTO test_min VALUES (1, 99.99), (1, 79.99), (1, 89.99);
SELECT * FROM test_min WHERE id = 1;
-- Output:
-- +----+--------------+
-- | id | lowest_price |
-- +----+--------------+
-- |  1 |        79.99 |
-- +----+--------------+

last_value

Overwrites the previous value with the most recent value.

  • Supported Types: All data types.

  • Null Handling: A null value overwrites the previous value.

CREATE TABLE test_last_value (
    id BIGINT,
    status STRING,
    last_login TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.status.agg' = 'last_value',
    'fields.last_login.agg' = 'last_value'
);

INSERT INTO test_last_value VALUES (1, 'online', TIMESTAMP '2024-01-01 10:00:00');
INSERT INTO test_last_value VALUES (1, 'offline', TIMESTAMP '2024-01-01 11:00:00');
INSERT INTO test_last_value VALUES (1, CAST(NULL AS STRING), TIMESTAMP '2024-01-01 12:00:00');
-- A null value overwrites the previous 'offline' value.
SELECT * FROM test_last_value WHERE id = 1;
-- Output:
-- +----+--------+---------------------+
-- | id | status | last_login          |
-- +----+--------+---------------------+
-- |  1 | null   | 2024-01-01 12:00:00 |
-- +----+--------+---------------------+

last_value_ignore_nulls

Replaces the previous value with the most recent non-null value. This is the default aggregation behavior when no aggregation function is specified.

  • Supported Types: All data types.

  • Null Handling: Ignores null values and retains the previous value.

CREATE TABLE test_last_value_ignore_nulls (
    id BIGINT,
    email STRING,
    phone STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.email.agg' = 'last_value_ignore_nulls',
    'fields.phone.agg' = 'last_value_ignore_nulls'
);

INSERT INTO test_last_value_ignore_nulls VALUES (1, 'user@example.com', '123-456');
INSERT INTO test_last_value_ignore_nulls VALUES (1, CAST(NULL AS STRING), '789-012');
-- The null value for email is ignored, and the previous value is retained.
INSERT INTO test_last_value_ignore_nulls VALUES (1, 'new@example.com', CAST(NULL AS STRING));
-- The null value for phone is ignored, and the previous value is retained.
SELECT * FROM test_last_value_ignore_nulls WHERE id = 1;
-- Output:
-- +----+-----------------+---------+
-- | id | email           | phone   |
-- +----+-----------------+---------+
-- |  1 | new@example.com | 789-012 |
-- +----+-----------------+---------+

first_value

Retains the first value of a field and ignores all subsequent values.

  • Supported Types: All data types.

  • Null Handling: If the first value is null, the null value is retained.

CREATE TABLE test_first_value (
    id BIGINT,
    first_purchase_date DATE,
    first_product STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.first_purchase_date.agg' = 'first_value',
    'fields.first_product.agg' = 'first_value'
);

INSERT INTO test_first_value VALUES (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB');
-- The second record is ignored, and the first value is retained.
SELECT * FROM test_first_value WHERE id = 1;
-- Output:
-- +----+---------------------+---------------+
-- | id | first_purchase_date | first_product |
-- +----+---------------------+---------------+
-- |  1 | 2024-01-01          | ProductA      |
-- +----+---------------------+---------------+

listagg

Concatenates multiple string values into a single string, separated by a delimiter.

  • Supported Types: STRING, CHAR.

  • Null Handling: Skips null values.

  • Delimiter: The default delimiter is a comma (,), but you can specify a custom delimiter as a parameter.

CREATE TABLE test_listagg (
    id BIGINT,
    tags1 STRING,
    tags2 STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.tags1.agg' = 'listagg',
    'fields.tags2.agg' = 'listagg',
    'fields.tags2.listagg.delimiter' = ';'   -- Specify a custom delimiter.
);

INSERT INTO test_listagg VALUES (1, 'developer', 'developer'), (1, 'java', 'java'), (1, 'flink', 'flink');
SELECT * FROM test_listagg WHERE id = 1;
-- Output:
-- +----+----------------------+----------------------+
-- | id | tags1                | tags2                |
-- +----+----------------------+----------------------+
-- |  1 | developer,java,flink | developer;java;flink |
-- +----+----------------------+----------------------+

string_agg

An alias for listagg. It concatenates multiple string values into a single string, separated by a delimiter.

  • Supported Types: STRING, CHAR.

  • Null Handling: Skips null values.

  • Delimiter: The default delimiter is a comma (,), but you can specify a custom delimiter as a parameter.

CREATE TABLE test_string_agg (
    id BIGINT,
    tags STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.tags.agg' = 'string_agg',
    'fields.tags.string_agg.delimiter' = ';'   -- Specify a custom delimiter.
);

INSERT INTO test_string_agg VALUES (1, 'developer'), (1, 'java'), (1, 'flink');
SELECT * FROM test_string_agg WHERE id = 1;
-- Output:
-- +----+----------------------+
-- | id | tags                 |
-- +----+----------------------+
-- |  1 | developer;java;flink |
-- +----+----------------------+

rbm32

Performs union aggregation on serialized 32-bit RoaringBitmap values.

  • Supported Types: BYTES.

  • Null Handling: Ignores null values.

  • Note: RoaringBitmap values must be serialized on the client in the standard format of the RoaringBitmap library before being ingested.

CREATE TABLE user_visits (
    user_id BIGINT,
    visit_bitmap BYTES,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.visit_bitmap.agg' = 'rbm32'
);

-- Insert serialized RoaringBitmap values (hexadecimal literals).
-- Bitmap {1,2}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000001000200');
-- Bitmap {2,3}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000002000300');

SELECT * FROM user_visits WHERE user_id = 1;
-- Result: visit_bitmap contains the union {1,2,3}.

rbm64

Performs union aggregation on serialized 64-bit RoaringBitmap values.

  • Supported Types: BYTES.

  • Null Handling: Ignores null values.

  • Note: Roaring64Bitmap values must be serialized on the client in the 64-bit format of the RoaringBitmap library before being ingested.

CREATE TABLE session_interactions (
    session_id BIGINT,
    interaction_bitmap BYTES,
    PRIMARY KEY (session_id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.interaction_bitmap.agg' = 'rbm64'
);

-- Insert serialized Roaring64Bitmap values.
-- Result: interaction_bitmap contains the union of all inserted bitmaps.

bool_and

Performs logical AND aggregation on boolean values. It returns true only if all values are true.

  • Supported Types: BOOLEAN.

  • Null Handling: Ignores null values.

CREATE TABLE test_bool_and (
    id BIGINT,
    has_all_permissions BOOLEAN,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.has_all_permissions.agg' = 'bool_and'
);

INSERT INTO test_bool_and VALUES (1, true), (1, true), (1, false);
SELECT * FROM test_bool_and WHERE id = 1;
-- Output:
-- +----+---------------------+
-- | id | has_all_permissions |
-- +----+---------------------+
-- |  1 | false               |
-- +----+---------------------+

bool_or

Performs logical OR aggregation on boolean values. It returns true if at least one value is true.

  • Supported Types: BOOLEAN.

  • Null Handling: Ignores null values.

CREATE TABLE test_bool_or (
    id BIGINT,
    has_any_alert BOOLEAN,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.has_any_alert.agg' = 'bool_or'
);

INSERT INTO test_bool_or VALUES (1, false), (1, false), (1, true);
SELECT * FROM test_bool_or WHERE id = 1;
-- Output:
-- +----+---------------+
-- | id | has_any_alert |
-- +----+---------------+
-- |  1 | true          |
-- +----+---------------+