Dynamic Table支持全量刷新和增量刷新两种模式。当源表数据量较大、数据新鲜度要求较高且单位时间内数据变更比例较低时,推荐使用增量刷新模式。本文介绍增量刷新Dynamic Table的使用最佳实践、调优手段和常见问题排查。
增量刷新适用场景
Dynamic Table支持两种刷新模式:
全量刷新:每次刷新时重新计算全部数据。
增量刷新:每次仅计算新增或变更的数据,并与已有全量结果合并。
当目标SQL具备以下特征时,推荐使用增量刷新模式(以下数据仅为方向性参考,实际效果请以线上测试为准):
源表数据总量较大(通常在千万行及以上)。
对数据新鲜度要求较高(需达到分钟级)。
单位时间(即刷新间隔)内数据变更比例较低(通常在0.1%~5%区间)。
这里指整个计算流程中数据变更比例低。在某些场景中,源表少量数据变更可能导致大量中间或最终结果重算(常见于小维表的更新),反而导致计算性能下降。
建表与首次刷新
增量Dynamic Table在首次刷新时会一次性处理全部历史数据。由于需要持久化中间计算结果,其资源消耗(尤其是内存)显著高于单次执行原始SQL。因此,对于大数据量的表,建议在建表时采取以下措施:
暂不启用自动刷新,先手动执行首次全量刷新及第一次增量刷新,确认流程正常后再开启自动刷新。
优先使用Serverless资源执行刷新,避免影响本地其他任务。Serverless刷新成功后,可通过刷新历史查看实际资源消耗。若本地资源充足,后续可切换为本地资源执行自动刷新。
示例参考如下。
-- 1. 定义源表
CREATE TABLE orders (
order_id BIGINT NOT NULL,
user_id BIGINT,
amount FLOAT,
PRIMARY KEY (order_id)
);
-- 2. 生成初始数据
INSERT INTO orders
SELECT i, i % 100000, RANDOM() * 1000
FROM GENERATE_SERIES(1, 1000000) AS i;
-- 3. 创建增量Dynamic Table(初始关闭自动刷新)
CREATE DYNAMIC TABLE order_agg
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false', -- 手动验证成功后再开启自动刷新
computing_resource = 'serverless' -- 优先使用Serverless资源
) AS
SELECT
user_id,
COUNT(1) AS order_count,
SUM(amount) AS amount_sum
FROM orders
GROUP BY user_id;
-- 4. 手动执行首次刷新并验证结果
REFRESH TABLE order_agg WITH (refresh_mode = 'incremental');
SELECT * FROM order_agg WHERE user_id = 1;
-- 5. 插入增量数据
INSERT INTO orders
SELECT i, i % 100000, RANDOM() * 1000
FROM GENERATE_SERIES(1000001, 1010001) AS i;
-- 6. 手动执行增量刷新并验证结果
REFRESH TABLE order_agg WITH (refresh_mode = 'incremental');
SELECT * FROM order_agg WHERE user_id = 1;
-- 7. 确认无误后,开启自动刷新
ALTER TABLE order_agg SET (auto_refresh_enable = 'true');动态表刷新配置GUC
增量Dynamic Table的刷新与普通的OLAP语句一样,也会受到各种GUC对语句执行的影响。但与普通OLAP语句不同,需要通过如下两种方式让GUC生效:
-- 配置/取消自动刷新时使用的GUC
ALTER TABLE order_agg SET (refresh_guc_hg_experimental_xxx = xxx);
ALTER TABLE order_agg RESET (refresh_guc_hg_experimental_xxx);
-- 手动刷新时设置GUC
REFRESH TABLE order_agg WITH (refresh_mode = 'incremental', refresh_guc_hg_experimental_xxx = xxx);常见的增量刷新调优手段
Append Only优化
如果基表仅有追加写入(即无UPDATE或DELETE操作),可为其设置appendonly属性。系统将在增量刷新过程中利用该特性进行优化,在多数场景下可提升性能;若查询中包含MIN/MAX聚合,性能提升尤为显著。
设置后,该表将无法执行更新或删除操作。需要在创建Dynamic Table之前设置此属性,才能起到优化效果。
-- 启用Append-Only优化(需在建Dynamic Table前执行)
CALL set_table_property('orders', 'mutate_type', 'appendonly');
-- 如需恢复默认行为(先删除所有关联的增量Dynamic Table)
CALL set_table_property('orders', 'mutate_type', 'none');Group By/Join Key顺序调优
增量Dynamic Table会维护聚合/Join的中间计算结果,每次增量刷新时会扫描这些中间结果与增量数据做合并算出最新结果。扫描中间结果时会用增量Group By/Join Key做文件级过滤,从而减少对中间结果的读取。
如果有多个Group By/Join Key,调整这些Key在SQL中的顺序有助于提高过滤度,从而提升整体性能。优先让以下两种字段排在前面:
连续性较强的字段,例如时间或递增的ID值。
区分度较高的字段。
示例如下:计算一天内不同销售平台每分钟不同种类商品的销量。
-- 分钟作为时间字段,连续性强,放在第一位
-- 商品类别相比销售平台,区分度更高,放在第二位
-- 销售平台只有几个较少的值,放在最后一位
CREATE TABLE order_details (
order_id BIGINT,
platform BIGINT,
product_id BIGINT,
category_id BIGINT,
amount FLOAT,
create_at TIMESTAMP
);
CREATE DYNAMIC TABLE order_detail_summary
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
SELECT
to_char(create_at, 'HH24:MI') AS minute,
category_id,
platform,
SUM(amount)
FROM order_details
GROUP BY minute, category_id, platform;维表Join使用场景
以下两种场景,应尽量使用维表Join,而不是多表JOIN:
某表数据更新频率较低,没有实时写入,其他表与之Join时应当使用维表Join。
某表数据更新虽然较为频繁,但其他表与之Join时使用到的字段通常是不变的,应当使用维表Join。
例如,订单表关联用户表时,若用户表变更频率较低,可采用维表Join:
CREATE TABLE orders (
order_id BIGINT NOT NULL,
user_id BIGINT,
amount FLOAT,
PRIMARY KEY (order_id)
);
INSERT INTO orders
SELECT i, i % 100000, RANDOM() * 1000
FROM GENERATE_SERIES(1, 1000000) AS i;
CREATE TABLE customers (
user_id BIGINT NOT NULL,
user_name TEXT,
PRIMARY KEY (user_id)
);
INSERT INTO customers
SELECT i, 'user_' || i
FROM GENERATE_SERIES(1, 100000) AS i;
CREATE DYNAMIC TABLE order_customer
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
SELECT
o.order_id,
o.user_id,
c.user_name,
o.amount
FROM orders o
-- FOR SYSTEM_TIME AS OF PROCTIME() 用于标识该Join为维表Join
LEFT JOIN customers FOR SYSTEM_TIME AS OF PROCTIME() c
ON o.user_id = c.user_id;维表Join中,维表的新增或变更仅对后续数据生效,不会回溯更新历史结果。更多关于维表Join的详细说明,请参见CREATE DYNAMIC TABLE。如需修正历史关联数据,可执行一次REFRESH OVERWRITE进行全量覆盖:
REFRESH OVERWRITE TABLE order_customer WITH (refresh_mode = 'incremental');在实际应用中,可结合维表Join与定期REFRESH OVERWRITE的策略,在可接受的时间窗口内容忍少量维表数据缺失。但需注意:REFRESH OVERWRITE开销较大,不宜频繁执行;若对维表时效性要求极高,仍建议使用多表Join。
双流Join(多表JOIN)调优
避免极小表参与双流Join
应尽量避免数据量很少且数据变化频繁的极小表参与双流Join。因为极小表的一行数据通常可以与大表中的多行数据匹配,增量刷新过程中,极小表中的数据变化几十行可能导致中间状态和最终结果中的大量数据需要更新,极大地削弱增量刷新本身的优势。
遇到这种情况,如果可以接受维表Join的语义(例如小表更新的字段在Join中没有被使用),可以用维表Join完成;或者在查询时再关联小表的字段。
示例:订单表-明细表-商品表三表Join。
CREATE TABLE orders (
order_id BIGINT NOT NULL,
user_id BIGINT,
PRIMARY KEY (order_id)
);
INSERT INTO orders SELECT i, i%100000 FROM GENERATE_SERIES(1, 1000000) AS i;
CREATE TABLE lineitems (
lineitem_id BIGINT NOT NULL,
order_id BIGINT,
product_id BIGINT,
category_id BIGINT,
PRIMARY KEY (lineitem_id)
);
INSERT INTO lineitems SELECT i, i%1000000, i%100000, i%1000 FROM GENERATE_SERIES(1, 10000000) AS i;
CREATE TABLE categories (
category_id BIGINT NOT NULL,
category_name TEXT,
PRIMARY KEY (category_id)
);
INSERT INTO categories SELECT i, 'category_' || i FROM GENERATE_SERIES(1, 1000) AS i;
CREATE DYNAMIC TABLE order_detail
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
SELECT
o.order_id,
o.user_id,
l.lineitem_id,
l.product_id,
l.category_id,
c.category_name
FROM orders o
LEFT JOIN lineitems l ON o.order_id = l.order_id
LEFT JOIN categories c ON l.category_id = c.category_id;在上述示例中,categories表的数据量很小只有1000行,但其对结果的影响却很大。假如categories表的500行数据发生了变化,那么结果表中将有500万行相关数据随之发生改变,导致增量刷新优势不再。
考虑到Join时只用到了category_name字段,该字段通常不会被更新,因此可以对categories使用维表Join来避免上述情况。
如果业务中categories(或其他小表)中用到的数据确实会被频繁更新,且希望能够从Dynamic Table中查询到这种更新,建议在查询时再Join这些小表(通常开销不大)。可以在Dynamic Table上再建一个View来实现,示例如下:
CREATE DYNAMIC TABLE order_detail_dt
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
SELECT
o.order_id,
o.user_id,
l.lineitem_id,
l.product_id,
l.category_id
FROM orders o
LEFT JOIN lineitems l ON o.order_id = l.order_id;
CREATE VIEW order_detail AS
SELECT
dt.*,
c.category_name
FROM order_detail_dt dt
LEFT JOIN categories c ON dt.category_id = c.category_id;执行计划调优
双流Join的执行顺序对性能影响显著。系统自动生成的执行计划未必最优,可通过调整SQL写法或使用Plan Hint优化Join顺序,目标如下:
多表Join时:优先让小表之间先Join,延迟大表参与,以减少中间状态数据量。
大表Join小表时:让小表作为Build Side,大表作为Probe Side,避免首次刷新OOM。
示例:优化订单明细-商品-类别的三表Join。
步骤一:查看并分析执行计划
-- 1. 准备数据
-- 订单表(大)
CREATE TABLE lineitems (
lineitem_id BIGINT NOT NULL,
order_id BIGINT,
product_id BIGINT,
PRIMARY KEY (lineitem_id)
);
-- 商品表(中)
CREATE TABLE products (
product_id BIGINT NOT NULL,
product_name TEXT,
category_id BIGINT,
PRIMARY KEY (product_id)
);
-- 类别表(小)
CREATE TABLE categories (
category_id BIGINT NOT NULL,
category_name TEXT,
PRIMARY KEY (category_id)
);
-- 2. 创建Dynamic Table
CREATE DYNAMIC TABLE lineitem_detail
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
SELECT
l.order_id,
l.lineitem_id,
p.product_id,
p.product_name,
c.category_id,
c.category_name
FROM lineitems l
LEFT JOIN products p ON l.product_id = p.product_id
LEFT JOIN categories c ON p.category_id = c.category_id;
-- 3. 查看执行计划
EXPLAIN REFRESH TABLE lineitem_detail WITH (REFRESH_MODE = 'incremental');执行计划输出如下:
QUERY PLAN
-> Gather (id=100005, fragmentId=8, dagId=8)
-> Hybrid DML (id=13, fragmentId=7, dagId=7)
-> Redistribution (id=100004, fragmentId=7, dagId=7)
Hash Key: lineitems.lineitem_id, products.product_id, categories.category_id, (row_key_pb_encoding(lineitems.product_id, products.category_id))
-> Project (id=11, fragmentId=5, dagId=5)
-> Incremental Hash Join Node (id=10, fragmentId=5, dagId=5)
Hash Cond: (products.category_id = categories.category_id)
-> Redistribution (id=100003, fragmentId=5, dagId=5)
Hash Key: products.category_id
-> Incremental Hash Join Node (id=6, fragmentId=3, dagId=3)
Hash Cond: (lineitems.product_id = products.product_id)
-> Redistribution (id=100002, fragmentId=3, dagId=3)
Hash Key: lineitems.product_id
-> Local Gather (id=2, fragmentId=1, dagId=1)
-> NiagaraScanNode on public.lineitems (id=1, fragmentId=2, dagId=1)
-> Local Gather (id=5, fragmentId=3, dagId=3)
-> NiagaraScanNode on public.products (id=4, fragmentId=4, dagId=3)
-> Local Gather (id=9, fragmentId=5, dagId=5)
-> NiagaraScanNode on public.categories (id=8, fragmentId=6, dagId=5)
RefreshMode: INCREMENTAL执行计划显示的Join顺序如下:
lineitems(Probe Side)与products(Build Side)做Join,计算出中间结果lineitem_product。
lineitem_product(Probe Side)与categories(Build Side)做Join,计算出最终结果。
步骤二:延迟大表参与Join
参考原表中数据量,lineitem_product这个中间结果数据量较大,增量刷新需要持久化维护中间结果,因此开销会相对较大,也会导致比较大的状态存储。此时可以让products和categories先完成Join,得到中间结果product_category,大幅减少中间结果数据量从而优化整体性能。
DROP TABLE lineitem_detail;
CREATE DYNAMIC TABLE lineitem_detail_opt
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
WITH pc AS (
SELECT
p.product_id,
p.product_name,
c.category_id,
c.category_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.category_id
)
SELECT
l.order_id,
l.lineitem_id,
pc.product_id,
pc.product_name,
pc.category_id,
pc.category_name
FROM lineitems l
LEFT JOIN pc ON l.product_id = pc.product_id;再次查看执行计划,确认已改为products和categories先Join,再和lineitems做Join:
EXPLAIN REFRESH TABLE lineitem_detail_opt WITH (REFRESH_MODE = 'incremental');执行计划输出如下:
QUERY PLAN
-> Gather (id=100006, fragmentId=9, dagId=9)
-> Hybrid DML (id=16, fragmentId=8, dagId=8)
-> Redistribution (id=100005, fragmentId=8, dagId=8)
Hash Key: lineitems.lineitem_id, products.product_id, categories.category_id, (row_key_pb_encoding(lineitems.product_id, products.category_id))
-> Project (id=14, fragmentId=7, dagId=7)
-> Project (id=13, fragmentId=7, dagId=7)
-> Incremental Hash Join Node (id=12, fragmentId=7, dagId=7)
Hash Cond: (products.product_id = lineitems.product_id)
-> Redistribution (id=100003, fragmentId=7, dagId=7)
Hash Key: products.product_id
-> Project (id=7, fragmentId=3, dagId=3)
-> Incremental Hash Join Node (id=6, fragmentId=3, dagId=3)
Hash Cond: (products.category_id = categories.category_id)
-> Redistribution (id=100002, fragmentId=3, dagId=3)
Hash Key: products.category_id
-> Local Gather (id=2, fragmentId=1, dagId=1)
-> NiagaraScanNode on public.products (id=1, fragmentId=2, dagId=1)
-> Local Gather (id=5, fragmentId=3, dagId=3)
-> NiagaraScanNode on public.categories (id=4, fragmentId=4, dagId=3)
-> Redistribution (id=100004, fragmentId=7, dagId=7)
Hash Key: lineitems.product_id
-> Local Gather (id=9, fragmentId=5, dagId=5)
-> NiagaraScanNode on public.lineitems (id=8, fragmentId=6, dagId=5)
RefreshMode: INCREMENTAL步骤三:借助Plan Hint指定Build/Probe端
优化后lineitems作为Build端参与Join,Hash Join中大表作为Build端构建Hash Table会占用更多内存,容易导致OOM。因此推荐利用Plan Hint让数据量小的表做Build端,数据量大的表做Probe端。
DROP TABLE lineitem_detail_opt;
CREATE DYNAMIC TABLE lineitem_detail
WITH (
freshness = '1 minutes',
auto_refresh_mode = 'incremental',
auto_refresh_enable = 'false',
computing_resource = 'serverless'
) AS
WITH pc AS (
SELECT
p.product_id,
p.product_name,
c.category_id,
c.category_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.category_id
)
SELECT /*+HINT Leading(l pc) */
l.order_id,
l.lineitem_id,
pc.product_id,
pc.product_name,
pc.category_id,
pc.category_name
FROM lineitems l
LEFT JOIN pc ON l.product_id = pc.product_id;查看最终执行计划:
EXPLAIN REFRESH TABLE lineitem_detail WITH (REFRESH_MODE = 'incremental');执行计划输出如下:
QUERY PLAN
Gather (cost=0.00..15.01 rows=4 width=56)
-> Sink Table (cost=0.00..15.01 rows=4 width=56)
-> Redistribution (cost=0.00..15.01 rows=4 width=56)
Hash Key: lineitems.lineitem_id, products.product_id, categories.category_id, (pg_catalog.row_key_pb_encoding(lineitems.product_id, products.category_id))
-> Project (cost=0.00..15.01 rows=4 width=56)
-> Hash Left Join (cost=0.00..15.01 rows=4 width=64)
Hash Cond: (lineitems.product_id = products.product_id)
-> Redistribution (cost=0.00..5.00 rows=1 width=24)
Hash Key: lineitems.product_id
-> Local Gather (cost=0.00..5.00 rows=1 width=24)
-> Seq Scan on lineitems (cost=0.00..5.00 rows=1 width=24)
-> Hash (cost=10.00..10.00 rows=2 width=40)
-> Redistribution (cost=0.00..10.00 rows=2 width=40)
Hash Key: products.product_id
-> Hash Left Join (cost=0.00..10.00 rows=2 width=40)
Hash Cond: (products.category_id = categories.category_id)
-> Redistribution (cost=0.00..5.00 rows=1 width=24)
Hash Key: products.category_id
-> Local Gather (cost=0.00..5.00 rows=1 width=24)
-> Seq Scan on products (cost=0.00..5.00 rows=1 width=24)
-> Hash (cost=5.00..5.00 rows=1 width=16)
-> Local Gather (cost=0.00..5.00 rows=1 width=16)
-> Seq Scan on categories (cost=0.00..5.00 rows=1 width=16)
Optimizer: HQO version 4.1.0最终执行计划较为理想:products和categories两个小的维表先完成Join,避免产生较大的中间状态,最后再和lineitems做Join,由lineitems做Probe端避免占用大量内存。
在32 CU规格的实例中测试,各数据规格如下:
lineitems:100,000,000行。
products:500,000行。
categories:10,000行。
测试结果如下:
首次刷新(增全量) | 增量刷新(1%增量) | 状态表大小 | |||||
E2E耗时 | 内存 | CPU Time | E2E耗时 | 内存 | CPU Time | ||
优化前 | 52.8s | 27 GB | 361s | 1254ms | 412 MB | 8404ms | 5247 MB |
优化后 | 49.9s | 22 GB | 255s | 911ms | 389 MB | 6886ms | 2003 MB |
Dynamic Table无损修改定义
Dynamic Table上线初期,业务上有时需要修改Dynamic Table的定义,但不希望业务受到影响(对Dynamic Table的查询不报错)。如果变更不大,可以使用ALTER DYNAMIC TABLE table_name AS SQL的方式修改定义,该方式会在下一次刷新时对表做一次OVERWRITE使数据变更为新的定义,但需要满足一些条件,限制较多。更多信息,请参见ALTER DYNAMIC TABLE。
如因不满足条件而无法使用ALTER DYNAMIC TABLE,且没有其他Dynamic Table依赖该Dynamic Table,则可以使用基于Dynamic Table的View来实现不影响查询的无损重建。假设需要重建Dynamic Table orders_agg,大致流程如下:
-- 1. 按照新的定义创建一个Dynamic Table(名字和旧的Dynamic Table不同)
CREATE DYNAMIC TABLE orders_agg_dt
WITH (
...
) AS ...;
-- 2. 等待第一次刷新完成,确认新的Dynamic Table中已经有数据了
SELECT * FROM orders_agg_dt LIMIT 1;
-- 3. 事务中新建View原子替换原来的Dynamic Table
BEGIN;
DROP TABLE orders_agg;
CREATE VIEW orders_agg AS SELECT * FROM orders_agg_dt;
END;常见问题排查
增量数据过多导致刷新失败
增量刷新每次触及的增量数据量通常是不稳定的,某次增量数据较多时可能会导致刷新失败。导致增量数据变多的原因通常包括:
对部分源表做了Overwrite操作,可能产生大量的数据变更。如果是这种情况,建议在DB级别设置如下GUC,使源表被Overwrite时自动触发Dynamic Table的Overwrite。
ALTER DATABASE xxx SET hg_experimental_enable_dynamic_table_consume_change_when_base_overwrite = off;手动暂停或因其他原因导致Dynamic Table一段时间没刷新,积累了较多的增量数据。
业务原因导致源表导入/变更数据短时间内突增。
这类问题导致的常见增量刷新报错信息如下:
Query OOM:ERRCODE_OUT_OF_MEMORY: Total memory used by all existing queries exceeded memory limitation。
Worker OOM:SERVER_INTERNAL_ERROR message: "ERPC_ERROR_CONNECTION_CLOSED, reason: Ping timeout"。
BinaryArray Exceed 2G:internal error: Capacity error: BinaryArray cannot contain more than 2147483646 bytes。
此时可以采用REFRESH OVERWRITE做一次手动增全量刷新,执行成功后观察后续的增量刷新是否恢复。
REFRESH OVERWRITE TABLE dynamic_table_xxx WITH (refresh_mode = 'incremental', refresh_guc_hg_computing_resource = 'serverless');报错:Multiple partitions / No partition is selected
报错信息示例:Multiple partitions of table XXX is selected或No partition of table XXX is selected。
问题原因:base表是物理分区表,Dynamic Table没有分区过滤,导致选取了多个物理分区子表。
解决方案:
建议将base表换成逻辑分区,Dynamic Table会自动适应分区选取。
如果base表是物理分区,且Dynamic Table也是物理分区,必须在Dynamic Table的定义里有分区过滤,且分区要一一对应。
增量刷新报错:Cannot find index full id: XXX
问题原因:增量Dynamic Table的base表允许rename,rename之后可以正常增量刷新;但rename之后的base表又被drop,导致增量刷新根据后端unique table id找不到对应表,无法消费数据。
解决方案:确保报错中的表名对应base表都存在的情况下,执行一次REFRESH OVERWRITE。
REFRESH OVERWRITE TABLE dynamic_table_xxx WITH (refresh_mode = 'incremental');增量刷新报错:GetLatestStreamOffset snapshots is empty或stream XXX is not existed
问题原因:增量Dynamic Table在某次成功刷新之后,太久没有刷新,导致后台记录的base表点位被自动清理。
解决方案:执行一次REFRESH OVERWRITE。
REFRESH OVERWRITE TABLE dynamic_table_xxx WITH (refresh_mode = 'incremental');