本文介绍合并引擎(Merge Engine )中的Aggregation Merge Engine用法。
Aggregation Merge Engine说明
-
合并策略
对于相同主键进行分组,并根据指定的聚合函数,对每个非主键字段逐一进行计算
-
配置方式
在创建表时,通过设置表属性
'table.merge-engine' = 'aggregation'来启用。 -
使用方式
-
使用控制台创建表时,选择对应的合并策略后添加高级配置
'fields.<字段名>.agg' = '<函数名>'为各非主键字段指定具体的聚合函数。 -
对于需要附加参数的聚合函数(如 listagg 的自定义分隔符)。
'fields.<字段名>.agg' = '<函数名>', 'fields.<字段名>.<函数名>.<参数名>' = '<参数值>'若未为某字段指定聚合函数,系统将默认使用
last_value_ignore_nulls作为聚合行为。
-
-
适用场景
-
计算累计总和与统计指标
-
维护计数器与度量值
-
跟踪最大/最小值变化
-
构建实时仪表盘与分析
-
-
使用限制
-
不支持回撤(Retraction)语义。引擎采用单向聚合逻辑,无法对已聚合的结果执行反向操作(例如,无法从 sum 结果中减去被删除的值,也无法撤销 max 以恢复次大值)。
-
对 DELETE 操作支持受限。引擎默认静默忽略删除操作('table.delete.behavior' = 'ignore')。
-
若需更改删除行为,可通过配置将其设置为拒绝并报错(disable),或允许直接物理移除整行记录(allow)。
'table.delete.behavior' = 'ignore' -- 默认值,静默忽略删除操作 'table.delete.behavior' = 'disable' -- 拒绝删除操作并返回错误 'table.delete.behavior' = 'allow' -- 允许删除操作
-
使用示例
CREATE TABLE product_stats (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
last_update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.price.agg' = 'max',
'fields.sales.agg' = 'sum'
-- last_update_time 未指定聚合函数,默认使用 last_value_ignore_nulls
);
-- 插入数据,相同主键的记录会自动聚合
INSERT INTO product_stats VALUES
(1, 23.0, 15, TIMESTAMP '2024-01-01 10:00:00'),
(1, 30.2, 20, TIMESTAMP '2024-01-01 11:00:00');
SELECT * FROM product_stats WHERE product_id = 1;
-- 输出:
-- +------------+-------+-------+---------------------+
-- | product_id | price | sales | last_update_time |
-- +------------+-------+-------+---------------------+
-- | 1 | 30.2 | 35 | 2024-01-01 11:00:00 |
-- +------------+-------+-------+---------------------+
-- price: 30.2(23.0 和 30.2 的最大值)
-- sales: 35(15 和 20 的累加和)
-- last_update_time: 2024-01-01 11:00:00(最后一个非空值)
支持的聚合函数
sum
计算多行数据的总和。
-
支持类型:
TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL。 -
空值处理:忽略 null 值。
CREATE TABLE test_sum (
id BIGINT,
amount DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.amount.agg' = 'sum'
);
INSERT INTO test_sum VALUES (1, 100.50), (1, 200.75);
SELECT * FROM test_sum WHERE id = 1;
-- 输出:
-- +----+---------+
-- | id | amount |
-- +----+---------+
-- | 1 | 301.25 |
-- +----+---------+
product
计算多行数据的乘积。
-
支持类型:
TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL。 -
空值处理:忽略 null 值。
CREATE TABLE test_product (
id BIGINT,
discount_factor DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.discount_factor.agg' = 'product'
);
INSERT INTO test_product VALUES (1, 0.9), (1, 0.8);
SELECT * FROM test_product WHERE id = 1;
-- 输出:
-- +----+-----------------------+
-- | id | discount_factor |
-- +----+-----------------------+
-- | 1 | 0.7200000000000001 |
-- +----+-----------------------+
-- 注意:由于 IEEE 754 双精度浮点运算,结果为 0.7200000000000001 而非 0.72
-- 如需精确计算,建议使用 DECIMAL 类型替代 DOUBLE
max
保留最大值。
-
支持类型:
CHAR、STRING、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。 -
空值处理:忽略 null 值。
CREATE TABLE test_max (
id BIGINT,
temperature DOUBLE,
reading_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.temperature.agg' = 'max',
'fields.reading_time.agg' = 'max'
);
INSERT INTO test_max VALUES
(1, 25.5, TIMESTAMP '2024-01-01 10:00:00'),
(1, 28.3, TIMESTAMP '2024-01-01 11:00:00');
SELECT * FROM test_max WHERE id = 1;
-- 输出:
-- +----+-------------+---------------------+
-- | id | temperature | reading_time |
-- +----+-------------+---------------------+
-- | 1 | 28.3 | 2024-01-01 11:00:00 |
-- +----+-------------+---------------------+
min
保留最小值。
-
支持类型:
CHAR、STRING、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。 -
空值处理:忽略 null 值。
CREATE TABLE test_min (
id BIGINT,
lowest_price DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.lowest_price.agg' = 'min'
);
INSERT INTO test_min VALUES (1, 99.99), (1, 79.99), (1, 89.99);
SELECT * FROM test_min WHERE id = 1;
-- 输出:
-- +----+--------------+
-- | id | lowest_price |
-- +----+--------------+
-- | 1 | 79.99 |
-- +----+--------------+
last_value
用最新接收到的值覆盖旧值。
-
支持类型:所有数据类型。
-
空值处理:null 值会直接覆盖旧值。
CREATE TABLE test_last_value (
id BIGINT,
status STRING,
last_login TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.status.agg' = 'last_value',
'fields.last_login.agg' = 'last_value'
);
INSERT INTO test_last_value VALUES (1, 'online', TIMESTAMP '2024-01-01 10:00:00');
INSERT INTO test_last_value VALUES (1, 'offline', TIMESTAMP '2024-01-01 11:00:00');
INSERT INTO test_last_value VALUES (1, CAST(NULL AS STRING), TIMESTAMP '2024-01-01 12:00:00');
-- null 会覆盖之前的 'offline' 值
SELECT * FROM test_last_value WHERE id = 1;
-- 输出:
-- +----+--------+---------------------+
-- | id | status | last_login |
-- +----+--------+---------------------+
-- | 1 | null | 2024-01-01 12:00:00 |
-- +----+--------+---------------------+
last_value_ignore_nulls
用最新的非空值替换之前的值。这是未指定聚合函数时的默认聚合行为。
-
支持类型:所有数据类型。
-
空值处理:忽略 null 值,保留之前的值。
CREATE TABLE test_last_value_ignore_nulls (
id BIGINT,
email STRING,
phone STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.email.agg' = 'last_value_ignore_nulls',
'fields.phone.agg' = 'last_value_ignore_nulls'
);
INSERT INTO test_last_value_ignore_nulls VALUES (1, 'user@example.com', '123-456');
INSERT INTO test_last_value_ignore_nulls VALUES (1, CAST(NULL AS STRING), '789-012');
-- email 为 null,被忽略,保留之前的值
INSERT INTO test_last_value_ignore_nulls VALUES (1, 'new@example.com', CAST(NULL AS STRING));
-- phone 为 null,被忽略,保留之前的值
SELECT * FROM test_last_value_ignore_nulls WHERE id = 1;
-- 输出:
-- +----+-----------------+---------+
-- | id | email | phone |
-- +----+-----------------+---------+
-- | 1 | new@example.com | 789-012 |
-- +----+-----------------+---------+
first_value
保留字段的第一个值,忽略后续所有值。
-
支持类型:所有数据类型。
-
空值处理:如果第一个值为 null,则保留 null。
CREATE TABLE test_first_value (
id BIGINT,
first_purchase_date DATE,
first_product STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.first_purchase_date.agg' = 'first_value',
'fields.first_product.agg' = 'first_value'
);
INSERT INTO test_first_value VALUES (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB');
-- 第二条记录被忽略,保留第一个值
SELECT * FROM test_first_value WHERE id = 1;
-- 输出:
-- +----+---------------------+---------------+
-- | id | first_purchase_date | first_product |
-- +----+---------------------+---------------+
-- | 1 | 2024-01-01 | ProductA |
-- +----+---------------------+---------------+
listagg
将多个字符串值用分隔符拼接为一个字符串。
-
支持类型:
STRING、CHAR。 -
空值处理:跳过 null 值。
-
分隔符:默认为逗号
,,可通过参数自定义。
CREATE TABLE test_listagg (
id BIGINT,
tags1 STRING,
tags2 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.tags1.agg' = 'listagg',
'fields.tags2.agg' = 'listagg',
'fields.tags2.listagg.delimiter' = ';' -- 指定分隔符
);
INSERT INTO test_listagg VALUES (1, 'developer', 'developer'), (1, 'java', 'java'), (1, 'flink', 'flink');
SELECT * FROM test_listagg WHERE id = 1;
-- 输出:
-- +----+----------------------+----------------------+
-- | id | tags1 | tags2 |
-- +----+----------------------+----------------------+
-- | 1 | developer,java,flink | developer;java;flink |
-- +----+----------------------+----------------------+
string_agg
listagg 的别名,功能完全相同。将多个字符串值用分隔符拼接为一个字符串。
-
支持的类型:
STRING、CHAR。 -
空值处理:跳过 null 值。
-
分隔符:默认为逗号
,,可通过参数自定义。
CREATE TABLE test_string_agg (
id BIGINT,
tags STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.tags.agg' = 'string_agg',
'fields.tags.string_agg.delimiter' = ';' -- 指定分隔符
);
INSERT INTO test_string_agg VALUES (1, 'developer'), (1, 'java'), (1, 'flink');
SELECT * FROM test_string_agg WHERE id = 1;
-- 输出:
-- +----+----------------------+
-- | id | tags |
-- +----+----------------------+
-- | 1 | developer;java;flink |
-- +----+----------------------+
rbm32
对序列化的 32 位 RoaringBitmap 值进行并集聚合。
-
支持类型:
BYTES。 -
空值处理:忽略 null 值。
-
注意:RoaringBitmap 值需在客户端预先序列化,使用 RoaringBitmap 库的标准格式。
CREATE TABLE user_visits (
user_id BIGINT,
visit_bitmap BYTES,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.visit_bitmap.agg' = 'rbm32'
);
-- 插入序列化的 RoaringBitmap 值(十六进制字面量)
-- Bitmap {1,2}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000001000200');
-- Bitmap {2,3}
INSERT INTO user_visits VALUES (1, x'3A30000001000000000001001000000002000300');
SELECT * FROM user_visits WHERE user_id = 1;
-- 结果:visit_bitmap 包含并集 {1,2,3}
rbm64
对序列化的 64 位 RoaringBitmap 值进行并集聚合。
-
支持类型:
BYTES。 -
空值处理:忽略 null 值。
-
注意:Roaring64Bitmap 值需在客户端预先序列化,使用 RoaringBitmap 库的 64 位格式。
CREATE TABLE session_interactions (
session_id BIGINT,
interaction_bitmap BYTES,
PRIMARY KEY (session_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.interaction_bitmap.agg' = 'rbm64'
);
-- 插入序列化的 Roaring64Bitmap 值
-- 结果:interaction_bitmap 包含所有插入 bitmap 的并集
bool_and
对布尔值进行逻辑与(AND)聚合,仅当所有值均为 true 时返回 true。
-
支持类型:
BOOLEAN。 -
空值处理:忽略 null 值。
CREATE TABLE test_bool_and (
id BIGINT,
has_all_permissions BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.has_all_permissions.agg' = 'bool_and'
);
INSERT INTO test_bool_and VALUES (1, true), (1, true), (1, false);
SELECT * FROM test_bool_and WHERE id = 1;
-- 输出:
-- +----+---------------------+
-- | id | has_all_permissions |
-- +----+---------------------+
-- | 1 | false |
-- +----+---------------------+
bool_or
对布尔值进行逻辑或(OR)聚合,只要有一个值为 true 即返回 true。
-
支持类型:
BOOLEAN。 -
空值处理:忽略 null 值。
CREATE TABLE test_bool_or (
id BIGINT,
has_any_alert BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.has_any_alert.agg' = 'bool_or'
);
INSERT INTO test_bool_or VALUES (1, false), (1, false), (1, true);
SELECT * FROM test_bool_or WHERE id = 1;
-- 输出:
-- +----+---------------+
-- | id | has_any_alert |
-- +----+---------------+
-- | 1 | true |
-- +----+---------------+