在金融、物流与物联网等业务场景中,系统会产生海量的时序数据,例如交易流水、轨迹数据和监控日志。对这些TB级别的数据进行实时分析,往往面临性能挑战。PolarDB PostgreSQL版凭借分区表、冷热分层等特性,为存储海量时序数据提供了高性价比的解决方案。在此基础上,您可以通过列存索引(IMCI)功能,无需进行复杂的数据预处理,即可实现对海量时序数据的实时、高性能分析,有效挖掘数据价值。
方案介绍
操作流程
数据写入:业务应用将时序数据(例如交易流水)写入PolarDB PostgreSQL版集群。
列存索引:在基表上创建列存索引。PolarDB PostgreSQL版会自动为维护表中列存数据。相比行存,列存以列为单位组织数据,具备较高的压缩比,在执行聚合查询时仅需读取相关列,从而减少I/O消耗。
查询加速:分析查询(如K线聚合)通过优化器或
Hint指定,优先使用列存索引。查询引擎利用列式存储和并行处理能力,完成对数据的扫描和聚合计算,最终返回结果。
方案优势
使用简单:无需改造业务或进行复杂的ETL,仅需为基表创建列存索引,即可透明加速分析查询。
功能丰富:原生支持分区表,并内置
time_bucket、first、last等丰富的时序分析函数,简化您的SQL开发。
效果展示
数据量:1 亿条数据,时间跨度2天,每天约5000万条。
K线聚合查询:含指定时间周期内最高值、最低值、开盘价、收盘价、交易总量等5个指标。
列存索引并行度:8。
耗时如下(单位:秒):
场景
秒级K线聚合
分钟级K线聚合
小时级K线聚合
天级K线聚合
全量数据聚合(1亿条)
3.41
0.95
0.93
0.91
1天数据聚合(约5000万条)
1.88
0.82
0.81
0.76
12小时数据聚合(约2500万条)
0.89
0.55
0.53
无
1小时数据聚合(约600万条)
0.41
0.39
0.37
无
实施步骤
步骤一:环境准备
请确认您的集群版本与配置是否满足以下条件:
集群版本:
PostgreSQL 16(内核小版本为2.0.16.8.3.0及以上)
PostgreSQL 14(内核小版本为2.0.14.10.20.0及以上)
原表必须包含主键,且在创建列存索引时需要将主键列加入列存索引中。
wal_level参数的值需设置为logical,即在预写式日志WAL(Write-Ahead Logging)中增加支持逻辑编码所需的信息。说明您可以通过控制台设置wal_level参数。修改该参数后集群将会重启,请在修改参数前做好业务安排,谨慎操作。
开启列存索引功能。
对于不同的PolarDB PostgreSQL版内核版本,开启列存索引的方式不同:
步骤二:数据准备
本方案将准备一张交易流水表,模拟生成1亿条交易数据,时间范围约为2天。在此期间,交易时间为每天的8:00至16:00,预计每日数据量约为4000万条。
-- 交易流水表 CREATE TABLE market_trades ( trade_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- 自增主键 trade_ts TIMESTAMP, -- 交易时间 market_id VARCHAR, -- 市场编号 price DECIMAL, -- 交易价格 amount DECIMAL, -- 交易数量 insert_ts TIMESTAMP -- 系统的写入时间 ); INSERT INTO market_trades(trade_ts, market_id, price, amount, insert_ts) SELECT trade_ts, market_id, price, amount, trade_ts + (random() * 500)::INT * INTERVAL '1 millisecond' AS insert_ts FROM ( -- ======================== -- 1. 第一天高峰:2025-06-01 8:00 - 16:00,4000万条 -- ======================== SELECT '2025-06-01 08:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' + -- 28800秒 = 8小时 (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 40000000) UNION ALL -- ======================== -- 2. 第一天非高峰:2025-06-01 16:00 - 2025-06-02 08:00,1000万条 -- ======================== SELECT CASE WHEN random() < 0.5 THEN -- 16:00 - 24:00 '2025-06-01 16:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' ELSE -- 00:00 - 08:00(第二天凌晨) '2025-06-02 00:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' END + (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 10000000) UNION ALL -- ======================== -- 3. 第二天高峰:2025-06-02 8:00 - 16:00,4000万条 -- ======================== SELECT '2025-06-02 08:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' + (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 40000000) UNION ALL -- ======================== -- 4. 第二天非高峰:2025-06-02 16:00 - 2025-06-03 08:00,1000万条 -- ======================== SELECT CASE WHEN random() < 0.5 THEN -- 16:00 - 24:00 '2025-06-02 16:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' ELSE -- 00:00 - 08:00(第三天凌晨) '2025-06-03 00:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' END + (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 10000000) ) AS data;为交易流水表创建列存索引。
CREATE INDEX idx_csi_market_trades ON market_trades USING CSI;
步骤三:执行K线聚合查询
使用场景:计算每个固定时间窗口的K线。
应用举例:计算每秒钟的股价最高值、最低值、开盘价、收盘价、交易总量。
以下示例分别计算每秒、每分钟、每小时、每天的K线数据。
秒级K线聚合
-- 秒级K线聚合
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 second', trade_ts) AS candle_ts, -- 1秒内的数据
market_id,
MIN(price) AS low, -- 1秒内最低价
MAX(price) AS high, -- 1秒内最高价
FIRST(price ORDER BY trade_ts) AS open, -- 1秒内开盘价
LAST(price ORDER BY trade_ts) AS close, -- 1秒内收盘价
SUM(amount) AS vol -- 1秒内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;分钟级K线聚合
-- 分钟级K线聚合
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 minute', trade_ts) AS candle_ts, -- 1分钟内的数据
market_id,
MIN(price) AS low, -- 1分钟内最低价
MAX(price) AS high, -- 1分钟内最高价
FIRST(price ORDER BY trade_ts) AS open, -- 1分钟内开盘价
LAST(price ORDER BY trade_ts) AS close, -- 1分钟内收盘价
SUM(amount) AS vol -- 1分钟内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;小时级K线聚合
-- 小时级K线聚合
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 hour', trade_ts) AS candle_ts, -- 1小时内的数据
market_id,
MIN(price) AS low, -- 1小时内最低价
MAX(price) AS high, -- 1小时内最高价
FIRST(price ORDER BY trade_ts) AS open, -- 1小时内开盘价
LAST(price ORDER BY trade_ts) AS close, -- 1小时内收盘价
SUM(amount) AS vol -- 1小时内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;天级K线聚合
-- 天级K线聚合
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 day', trade_ts) AS candle_ts, -- 1天内的数据
market_id,
MIN(price) AS low, -- 1天内最低价
MAX(price) AS high, -- 1天内最高价
FIRST(price ORDER BY trade_ts) AS open, -- 1天内开盘价
LAST(price ORDER BY trade_ts) AS close, -- 1天内收盘价
SUM(amount) AS vol -- 1天内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;SQL说明
/*+ SET (polar_csi.enable_query on) */:用于强制查询使用列存索引执行计划。在某些场景下,优化器可能误判行存更优,此时可使用此Hint确保查询走列存路径。time_bucket(bucket_width, ts):时序数据库提供的函数,用于将时间戳ts按指定的时间间隔bucket_width进行分组。





