Aggregation Merge Engine

更新时间:
复制为 MD 格式

本文介绍合并引擎(Merge Engine )中的Aggregation Merge Engine用法。

Aggregation Merge Engine说明

  • 合并策略

    对于相同主键进行分组,并根据指定的聚合函数,对每个非主键字段逐一进行计算

  • 配置方式

    在创建表时,通过设置表属性'table.merge-engine' = 'aggregation'来启用。

  • 使用方式

    • 使用控制台创建表时,选择对应的合并策略后添加高级配置 'fields.<字段名>.agg' = '<函数名>' 为各非主键字段指定具体的聚合函数。

    • 对于需要附加参数的聚合函数(如 listagg 的自定义分隔符)。

      'fields.<字段名>.agg' = '<函数名>',
      'fields.<字段名>.<函数名>.<参数名>' = '<参数值>'
      若未为某字段指定聚合函数,系统将默认使用 last_value_ignore_nulls 作为聚合行为。
  • 适用场景

    • 计算累计总和与统计指标

    • 维护计数器与度量值

    • 跟踪最大/最小值变化

    • 构建实时仪表盘与分析

  • 使用限制

    • 不支持回撤(Retraction)语义。引擎采用单向聚合逻辑,无法对已聚合的结果执行反向操作(例如,无法从 sum 结果中减去被删除的值,也无法撤销 max 以恢复次大值)。

    • 对 DELETE 操作支持受限。引擎默认静默忽略删除操作('table.delete.behavior' = 'ignore')。

    • 若需更改删除行为,可通过配置将其设置为拒绝并报错(disable),或允许直接物理移除整行记录(allow)。

      'table.delete.behavior' = 'ignore'   -- 默认值,静默忽略删除操作
      'table.delete.behavior' = 'disable'  -- 拒绝删除操作并返回错误
      'table.delete.behavior' = 'allow'    -- 允许删除操作

使用示例

CREATE TABLE product_stats (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT,
    last_update_time TIMESTAMP(3),
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.price.agg' = 'max',
    'fields.sales.agg' = 'sum'
    -- last_update_time 未指定聚合函数,默认使用 last_value_ignore_nulls
);

-- 插入数据,相同主键的记录会自动聚合
INSERT INTO product_stats VALUES
    (1, 23.0, 15, TIMESTAMP '2024-01-01 10:00:00'),
    (1, 30.2, 20, TIMESTAMP '2024-01-01 11:00:00');

SELECT * FROM product_stats WHERE product_id = 1;
-- 输出:
-- +------------+-------+-------+---------------------+
-- | product_id | price | sales | last_update_time    |
-- +------------+-------+-------+---------------------+
-- |          1 |  30.2 |    35 | 2024-01-01 11:00:00 |
-- +------------+-------+-------+---------------------+
-- price: 30.2(23.0 和 30.2 的最大值)
-- sales: 35(15 和 20 的累加和)
-- last_update_time: 2024-01-01 11:00:00(最后一个非空值)

支持的聚合函数

sum

计算多行数据的总和。

  • 支持类型TINYINTSMALLINTINTBIGINTFLOATDOUBLEDECIMAL

  • 空值处理:忽略 null 值。

CREATE TABLE test_sum (
    id BIGINT,
    amount DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.amount.agg' = 'sum'
);

INSERT INTO test_sum VALUES (1, 100.50), (1, 200.75);
SELECT * FROM test_sum WHERE id = 1;
-- 输出:
-- +----+---------+
-- | id | amount  |
-- +----+---------+
-- |  1 | 301.25  |
-- +----+---------+

product

计算多行数据的乘积。

  • 支持类型TINYINTSMALLINTINTBIGINTFLOATDOUBLEDECIMAL

  • 空值处理:忽略 null 值。

CREATE TABLE test_product (
    id BIGINT,
    discount_factor DOUBLE,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.discount_factor.agg' = 'product'
);

INSERT INTO test_product VALUES (1, 0.9), (1, 0.8);
SELECT * FROM test_product WHERE id = 1;
-- 输出:
-- +----+-----------------------+
-- | id | discount_factor       |
-- +----+-----------------------+
-- |  1 | 0.7200000000000001    |
-- +----+-----------------------+
-- 注意:由于 IEEE 754 双精度浮点运算,结果为 0.7200000000000001 而非 0.72
-- 如需精确计算,建议使用 DECIMAL 类型替代 DOUBLE

max

保留最大值。

  • 支持类型CHARSTRINGTINYINTSMALLINTINTBIGINTFLOATDOUBLEDECIMALDATETIMETIMESTAMPTIMESTAMP_LTZ

  • 空值处理:忽略 null 值。

CREATE TABLE test_max (
    id BIGINT,
    temperature DOUBLE,
    reading_time TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.temperature.agg' = 'max',
    'fields.reading_time.agg' = 'max'
);

INSERT INTO test_max VALUES
    (1, 25.5, TIMESTAMP '2024-01-01 10:00:00'),
    (1, 28.3, TIMESTAMP '2024-01-01 11:00:00');
SELECT * FROM test_max WHERE id = 1;
-- 输出:
-- +----+-------------+---------------------+
-- | id | temperature | reading_time        |
-- +----+-------------+---------------------+
-- |  1 |        28.3 | 2024-01-01 11:00:00 |
-- +----+-------------+---------------------+

min

保留最小值。

  • 支持类型CHARSTRINGTINYINTSMALLINTINTBIGINTFLOATDOUBLEDECIMALDATETIMETIMESTAMPTIMESTAMP_LTZ

  • 空值处理:忽略 null 值。

CREATE TABLE test_min (
    id BIGINT,
    lowest_price DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.lowest_price.agg' = 'min'
);

INSERT INTO test_min VALUES (1, 99.99), (1, 79.99), (1, 89.99);
SELECT * FROM test_min WHERE id = 1;
-- 输出:
-- +----+--------------+
-- | id | lowest_price |
-- +----+--------------+
-- |  1 |        79.99 |
-- +----+--------------+

last_value

用最新接收到的值覆盖旧值。

  • 支持类型:所有数据类型。

  • 空值处理:null 值会直接覆盖旧值。

CREATE TABLE test_last_value (
    id BIGINT,
    status STRING,
    last_login TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.status.agg' = 'last_value',
    'fields.last_login.agg' = 'last_value'
);

INSERT INTO test_last_value VALUES (1, 'online', TIMESTAMP '2024-01-01 10:00:00');
INSERT INTO test_last_value VALUES (1, 'offline', TIMESTAMP '2024-01-01 11:00:00');
INSERT INTO test_last_value VALUES (1, CAST(NULL AS STRING), TIMESTAMP '2024-01-01 12:00:00');
-- null 会覆盖之前的 'offline' 值
SELECT * FROM test_last_value WHERE id = 1;
-- 输出:
-- +----+--------+---------------------+
-- | id | status | last_login          |
-- +----+--------+---------------------+
-- |  1 | null   | 2024-01-01 12:00:00 |
-- +----+--------+---------------------+

last_value_ignore_nulls

用最新的非空值替换之前的值。这是未指定聚合函数时的默认聚合行为

  • 支持类型:所有数据类型。

  • 空值处理:忽略 null 值,保留之前的值。

CREATE TABLE test_last_value_ignore_nulls (
    id BIGINT,
    email STRING,
    phone STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.email.agg' = 'last_value_ignore_nulls',
    'fields.phone.agg' = 'last_value_ignore_nulls'
);

INSERT INTO test_last_value_ignore_nulls VALUES (1, 'user@example.com', '123-456');
INSERT INTO test_last_value_ignore_nulls VALUES (1, CAST(NULL AS STRING), '789-012');
-- email 为 null,被忽略,保留之前的值
INSERT INTO test_last_value_ignore_nulls VALUES (1, 'new@example.com', CAST(NULL AS STRING));
-- phone 为 null,被忽略,保留之前的值
SELECT * FROM test_last_value_ignore_nulls WHERE id = 1;
-- 输出:
-- +----+-----------------+---------+
-- | id | email           | phone   |
-- +----+-----------------+---------+
-- |  1 | new@example.com | 789-012 |
-- +----+-----------------+---------+

first_value

保留字段的第一个值,忽略后续所有值。

  • 支持类型:所有数据类型。

  • 空值处理:如果第一个值为 null,则保留 null。

CREATE TABLE test_first_value (
    id BIGINT,
    first_purchase_date DATE,
    first_product STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.first_purchase_date.agg' = 'first_value',
    'fields.first_product.agg' = 'first_value'
);

INSERT INTO test_first_value VALUES (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB');
-- 第二条记录被忽略,保留第一个值
SELECT * FROM test_first_value WHERE id = 1;
-- 输出:
-- +----+---------------------+---------------+
-- | id | first_purchase_date | first_product |
-- +----+---------------------+---------------+
-- |  1 | 2024-01-01          | ProductA      |
-- +----+---------------------+---------------+

listagg

将多个字符串值用分隔符拼接为一个字符串。

  • 支持类型STRINGCHAR

  • 空值处理:跳过 null 值。

  • 分隔符:默认为逗号 ,,可通过参数自定义。

CREATE TABLE test_listagg (
    id BIGINT,
    tags1 STRING,
    tags2 STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.tags1.agg' = 'listagg',
    'fields.tags2.agg' = 'listagg',
    'fields.tags2.listagg.delimiter' = ';'   -- 指定分隔符
);

INSERT INTO test_listagg VALUES (1, 'developer', 'developer'), (1, 'java', 'java'), (1, 'flink', 'flink');
SELECT * FROM test_listagg WHERE id = 1;
-- 输出:
-- +----+----------------------+----------------------+
-- | id | tags1                | tags2                |
-- +----+----------------------+----------------------+
-- |  1 | developer,java,flink | developer;java;flink |
-- +----+----------------------+----------------------+

string_agg

listagg 的别名,功能完全相同。将多个字符串值用分隔符拼接为一个字符串。

  • 支持的类型STRINGCHAR

  • 空值处理:跳过 null 值。

  • 分隔符:默认为逗号,,可通过参数自定义。

CREATE TABLE test_string_agg (
    id BIGINT,
    tags STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.tags.agg' = 'string_agg',
    'fields.tags.string_agg.delimiter' = ';'   -- 指定分隔符
);

INSERT INTO test_string_agg VALUES (1, 'developer'), (1, 'java'), (1, 'flink');
SELECT * FROM test_string_agg WHERE id = 1;
-- 输出:
-- +----+----------------------+
-- | id | tags                 |
-- +----+----------------------+
-- |  1 | developer;java;flink |
-- +----+----------------------+

rbm32

对序列化的 32 位 RoaringBitmap 值进行并集聚合。

  • 支持类型BYTES

  • 空值处理:忽略 null 值。

  • 注意:RoaringBitmap 值需在客户端预先序列化,使用 RoaringBitmap 库的标准格式。

CREATE TABLE user_visits (
    user_id BIGINT,
    visit_bitmap BYTES,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.visit_bitmap.agg' = 'rbm32'
);

-- 插入序列化的 RoaringBitmap 值(十六进制字面量)
-- Bitmap {1,2}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000001000200');
-- Bitmap {2,3}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000002000300');

SELECT * FROM user_visits WHERE user_id = 1;
-- 结果:visit_bitmap 包含并集 {1,2,3}

rbm64

对序列化的 64 位 RoaringBitmap 值进行并集聚合。

  • 支持类型BYTES

  • 空值处理:忽略 null 值。

  • 注意:Roaring64Bitmap 值需在客户端预先序列化,使用 RoaringBitmap 库的 64 位格式。

CREATE TABLE session_interactions (
    session_id BIGINT,
    interaction_bitmap BYTES,
    PRIMARY KEY (session_id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.interaction_bitmap.agg' = 'rbm64'
);

-- 插入序列化的 Roaring64Bitmap 值
-- 结果:interaction_bitmap 包含所有插入 bitmap 的并集

bool_and

对布尔值进行逻辑与(AND)聚合,仅当所有值均为 true 时返回 true。

  • 支持类型BOOLEAN

  • 空值处理:忽略 null 值。

CREATE TABLE test_bool_and (
    id BIGINT,
    has_all_permissions BOOLEAN,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.has_all_permissions.agg' = 'bool_and'
);

INSERT INTO test_bool_and VALUES (1, true), (1, true), (1, false);
SELECT * FROM test_bool_and WHERE id = 1;
-- 输出:
-- +----+---------------------+
-- | id | has_all_permissions |
-- +----+---------------------+
-- |  1 | false               |
-- +----+---------------------+

bool_or

对布尔值进行逻辑或(OR)聚合,只要有一个值为 true 即返回 true。

  • 支持类型BOOLEAN

  • 空值处理:忽略 null 值。

CREATE TABLE test_bool_or (
    id BIGINT,
    has_any_alert BOOLEAN,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'table.merge-engine' = 'aggregation',
    'fields.has_any_alert.agg' = 'bool_or'
);

INSERT INTO test_bool_or VALUES (1, false), (1, false), (1, true);
SELECT * FROM test_bool_or WHERE id = 1;
-- 输出:
-- +----+---------------+
-- | id | has_any_alert |
-- +----+---------------+
-- |  1 | true          |
-- +----+---------------+