使用列存索引加速时序数据分析

在金融、物流与物联网等业务场景中,系统会产生海量的时序数据,例如交易流水、轨迹数据和监控日志。对这些TB级别的数据进行实时分析,往往面临性能挑战。PolarDB PostgreSQL凭借分区表、冷热分层等特性,为存储海量时序数据提供了高性价比的解决方案。在此基础上,您可以通过列存索引(IMCI)功能,无需进行复杂的数据预处理,即可实现对海量时序数据的实时、高性能分析,有效挖掘数据价值。

方案介绍

操作流程

  1. 数据写入:业务应用将时序数据(例如交易流水)写入PolarDB PostgreSQL集群。

  2. 列存索引:在基表上创建列存索引。PolarDB PostgreSQL会自动为维护表中列存数据。相比行存,列存以列为单位组织数据,具备较高的压缩比,在执行聚合查询时仅需读取相关列,从而减少I/O消耗。

  3. 查询加速:分析查询(如K线聚合)通过优化器或Hint指定,优先使用列存索引。查询引擎利用列式存储和并行处理能力,完成对数据的扫描和聚合计算,最终返回结果。

方案优势

  • 使用简单:无需改造业务或进行复杂的ETL,仅需为基表创建列存索引,即可透明加速分析查询。

  • 功能丰富:原生支持分区表,并内置time_bucketfirstlast等丰富的时序分析函数,简化您的SQL开发。

效果展示

  • 数据量:1 亿条数据,时间跨度2天,每天约5000万条。

  • K线聚合查询:含指定时间周期内最高值、最低值、开盘价、收盘价、交易总量等5个指标。

  • 列存索引并行度:8。

  • 耗时如下(单位:秒):

    场景

    秒级K线聚合

    分钟级K线聚合

    小时级K线聚合

    天级K线聚合

    全量数据聚合(1亿条)

    3.41

    0.95

    0.93

    0.91

    1天数据聚合(约5000万条)

    1.88

    0.82

    0.81

    0.76

    12小时数据聚合(约2500万条)

    0.89

    0.55

    0.53

    1小时数据聚合(约600万条)

    0.41

    0.39

    0.37

实施步骤

步骤一:环境准备

  1. 请确认您的集群版本与配置是否满足以下条件:

    • 集群版本:

      • PostgreSQL 16(内核小版本为2.0.16.8.3.0及以上)

      • PostgreSQL 14(内核小版本为2.0.14.10.20.0及以上)

      说明

      您可在控制台查看内核小版本号,也可以通过SHOW polardb_version;语句查看。如未满足内核小版本要求,请升级内核小版本

    • 原表必须包含主键,且在创建列存索引时需要将主键列加入列存索引中。

    • wal_level参数的值需设置为logical,即在预写式日志WAL(Write-Ahead Logging)中增加支持逻辑编码所需的信息。

      说明

      您可以通过控制台设置wal_level参数。修改该参数后集群将会重启,请在修改参数前做好业务安排,谨慎操作。

  2. 开启列存索引功能。

    对于不同的PolarDB PostgreSQL内核版本,开启列存索引的方式不同:

    PostgreSQL 16(2.0.16.9.8.0及以上)或PostgreSQL 14(2.0.14.17.35.0及以上)

    当前版本下的PolarDB PostgreSQL集群,支持两种开启方式,具体差异如下,请按需选择:

    对比项

    【推荐】添加列存索引只读节点

    直接使用预安装的列存索引插件

    操作方式

    通过控制台实现可视化操作,手动添加列存索引节点。

    无需任何操作,即可直接使用。

    资源分配

    列存引擎独占所有资源,能够充分利用所有内存。

    列存引擎只能使用25%的内存,其余内存则分配给行存引擎使用。

    业务影响

    TP(事务)与AP(分析)业务在不同节点上相互隔离,互不影响。

    TP(事务)与AP(分析)业务在同一节点,会互相影响。

    费用

    需额外收取列存索引只读节点的费用,按照普通计算节点收费。

    无费用。

    添加列存索引只读节点

    您可选择以下两种方式中任意一种方式添加列存索引只读节点:

    说明

    集群中应包含一个只读节点,即单节点集群不支持添加列存索引只读节点。

    控制台添加
    1. 登录PolarDB控制台,选择集群所在地域。您可以按照如下两种方式中的任意一种进入增删节点向导页面:

      • 集群列表页面,单击操作栏的增删节点

        image

      • 在目标集群的基本信息页面,数据库节点区域,单击增删节点

        image

    2. 选择增加列存索引只读节点选项,并单击确定

    3. 在集群变配页面,添加列存索引只读节点并支付。

      1. 单击+增加一个列存索引只读节点,选择节点规格。

      2. 选择切换时间。

      3. (可选)查看产品服务协议、服务等级协议。

      4. 单击立即购买

      image

    4. 支付完成后,返回集群详情页等待列存索引只读节点添加成功,即节点状态为运行中image

    购买时添加

    PolarDB购买页节点个数配置项中自行选择列存索引只读节点数量。

    image

    PostgreSQL 16(2.0.16.8.3.0~2.0.16.9.8.0)或PostgreSQL 14(2.0.14.10.20.0~2.0.14.17.35.0)

    当前版本下的PolarDB PostgreSQL集群,列存索引作为插件polar_csi部署在数据库集群中,在使用之前需要在指定的数据库中创建插件。

    说明
    • polar_csi插件的作用域是Database级别,如果需要在集群的多个Database中使用列存索引,需要为每个Database分别创建polar_csi插件。

    • 安装插件使用的数据库账号必须为高权限账号

    您可以选择以下两种方式中的任意一种安装polar_csi插件。

    控制台安装

    1. 登录PolarDB控制台,在左侧导航栏单击集群列表,选择集群所在地域,并单击目标集群ID进入集群详情页。

    2. 在左侧导航栏选择配置与管理 > 插件管理,在管理插件页签,选中未安装插件

    3. 在页面右上角选择目标数据库,单击polar_csi插件操作列安装,在弹出的安装插件对话框,选择目标数据库账号,单击确定,即将插件安装到目标数据库中。

      image.png

    命令行安装

    连接数据库集群,并在具有相应权限的目标数据库中执行以下语句,创建polar_csi插件。

    CREATE EXTENSION polar_csi;

步骤二:数据准备

  1. 本方案将准备一张交易流水表,模拟生成1亿条交易数据,时间范围约为2天。在此期间,交易时间为每天的8:0016:00,预计每日数据量约为4000万条。

    -- 交易流水表
    CREATE TABLE market_trades (
        trade_id   BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,  -- 自增主键
        trade_ts   TIMESTAMP,                                        -- 交易时间
        market_id  VARCHAR,                                          -- 市场编号
        price      DECIMAL,                                          -- 交易价格
        amount     DECIMAL,                                          -- 交易数量  
        insert_ts  TIMESTAMP                                         -- 系统的写入时间
    );
    
    INSERT INTO market_trades(trade_ts, market_id, price, amount, insert_ts)
    SELECT
        trade_ts,
        market_id,
        price,
        amount,
        trade_ts + (random() * 500)::INT * INTERVAL '1 millisecond' AS insert_ts
    FROM (
        -- ========================
        -- 1. 第一天高峰:2025-06-01 8:00 - 16:00,4000万条
        -- ========================
        SELECT
            '2025-06-01 08:00:00'::TIMESTAMP +
            (random() * 28800)::INT * INTERVAL '1 second' +  -- 28800秒 = 8小时
            (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts,
            CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id,
            CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price,
            random() * 10 + 0.1 AS amount
        FROM generate_series(1, 40000000)
    
        UNION ALL
    
        -- ========================
        -- 2. 第一天非高峰:2025-06-01 16:00 - 2025-06-02 08:00,1000万条
        -- ========================
        SELECT
            CASE 
                WHEN random() < 0.5 THEN
                    -- 16:00 - 24:00
                    '2025-06-01 16:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second'
                ELSE
                    -- 00:00 - 08:00(第二天凌晨)
                    '2025-06-02 00:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second'
            END +
            (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts,
            CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id,
            CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price,
            random() * 10 + 0.1 AS amount
        FROM generate_series(1, 10000000)
    
        UNION ALL
    
        -- ========================
        -- 3. 第二天高峰:2025-06-02 8:00 - 16:00,4000万条
        -- ========================
        SELECT
            '2025-06-02 08:00:00'::TIMESTAMP +
            (random() * 28800)::INT * INTERVAL '1 second' +
            (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts,
            CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id,
            CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price,
            random() * 10 + 0.1 AS amount
        FROM generate_series(1, 40000000)
    
        UNION ALL
    
        -- ========================
        -- 4. 第二天非高峰:2025-06-02 16:00 - 2025-06-03 08:00,1000万条
        -- ========================
        SELECT
            CASE 
                WHEN random() < 0.5 THEN
                    -- 16:00 - 24:00
                    '2025-06-02 16:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second'
                ELSE
                    -- 00:00 - 08:00(第三天凌晨)
                    '2025-06-03 00:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second'
            END +
            (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts,
            CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id,
            CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price,
            random() * 10 + 0.1 AS amount
        FROM generate_series(1, 10000000)
    ) AS data;
  2. 为交易流水表创建列存索引。

    CREATE INDEX idx_csi_market_trades ON market_trades USING CSI;

步骤三:执行K线聚合查询

使用场景:计算每个固定时间窗口的K线。

应用举例:计算每秒钟的股价最高值、最低值、开盘价、收盘价、交易总量。

以下示例分别计算每秒、每分钟、每小时、每天的K线数据。

秒级K线聚合

-- 秒级K线聚合
/*+ SET (polar_csi.enable_query on) */
SELECT
    time_bucket('1 second', trade_ts) AS candle_ts,   -- 1秒内的数据
    market_id,
    MIN(price) AS low,                                -- 1秒内最低价
    MAX(price) AS high,                               -- 1秒内最高价
    FIRST(price ORDER BY trade_ts) AS open,           -- 1秒内开盘价
    LAST(price ORDER BY trade_ts) AS close,           -- 1秒内收盘价
    SUM(amount) AS vol                                -- 1秒内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;

分钟级K线聚合

-- 分钟级K线聚合
/*+ SET (polar_csi.enable_query on) */
SELECT
    time_bucket('1 minute', trade_ts) AS candle_ts,   -- 1分钟内的数据
    market_id,
    MIN(price) AS low,                                -- 1分钟内最低价
    MAX(price) AS high,                               -- 1分钟内最高价
    FIRST(price ORDER BY trade_ts) AS open,           -- 1分钟内开盘价
    LAST(price ORDER BY trade_ts) AS close,           -- 1分钟内收盘价
    SUM(amount) AS vol                                -- 1分钟内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;

小时级K线聚合

-- 小时级K线聚合
/*+ SET (polar_csi.enable_query on) */ 
SELECT
    time_bucket('1 hour', trade_ts) AS candle_ts,     -- 1小时内的数据
    market_id,
    MIN(price) AS low,                                -- 1小时内最低价
    MAX(price) AS high,                               -- 1小时内最高价
    FIRST(price ORDER BY trade_ts) AS open,           -- 1小时内开盘价
    LAST(price ORDER BY trade_ts) AS close,           -- 1小时内收盘价
    SUM(amount) AS vol                                -- 1小时内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;

天级K线聚合

-- 天级K线聚合
/*+ SET (polar_csi.enable_query on) */ 
SELECT
    time_bucket('1 day', trade_ts) AS candle_ts,     -- 1天内的数据
    market_id,
    MIN(price) AS low,                                -- 1天内最低价
    MAX(price) AS high,                               -- 1天内最高价
    FIRST(price ORDER BY trade_ts) AS open,           -- 1天内开盘价
    LAST(price ORDER BY trade_ts) AS close,           -- 1天内收盘价
    SUM(amount) AS vol                                -- 1天内交易总量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;

SQL说明

  • /*+ SET (polar_csi.enable_query on) */:用于强制查询使用列存索引执行计划。在某些场景下,优化器可能误判行存更优,此时可使用此Hint确保查询走列存路径。

  • time_bucket(bucket_width, ts)时序数据库提供的函数,用于将时间戳ts按指定的时间间隔bucket_width进行分组。