列存索引(IMCI)实战:商品品类停留时长分析

更新时间:
复制为 MD 格式

本文以商品品类停留时长统计为例,演示如何在PolarDB PostgreSQL DynamoDB兼容模式中,利用列存索引(IMCI)将分析查询下推到数据库引擎层面,避免在业务侧拉取全量数据后再做聚合。

示例场景

假设业务中有一个名为 usertable 的 DynamoDB 表,记录用户在不同商品品类页面的停留信息。每条记录的业务文档(polardb_document)结构如下:

{
  "userid": {"S": "user1"},
  "event_time": {"S": "2026-01-01T10:00:00Z"},
  "product_category": {"S": "Clothing"},
  "duration": {"N": "10"}
}

字段说明:

  • userid:用户 ID(DynamoDB 分区键)。

  • event_time:访问开始时间。

  • product_category:商品品类。

  • duration:停留时长(分钟)。

业务需求

统计最近一年内,每个商品品类所有用户的停留总时长

在传统 DynamoDB 中,服务端无法完成聚合运算,需要通过过滤表达式获取数据后在业务代码中进行聚合;数据量大时延迟和读容量消耗显著上升。

PolarDB PostgreSQL DynamoDB兼容模式结合,可以在 PostgreSQL 引擎层面直接完成高性能分析查询,无需将数据拉取到业务侧。

步骤一:创建表并预置数据

以下 Python 代码通过 DynamoDB API 创建 usertable 表并插入 5 条测试数据。连接配置中的 ENDPOINT_URL 需替换为 DynamoDB 兼容访问地址,具体获取方式参见。

#!/usr/bin/env python3
"""
创建 usertable 并插入测试数据
"""

import boto3
from botocore.exceptions import ClientError

# DynamoDB 连接配置
ENDPOINT_URL = "<your-endpoint_url>"
REGION = "<your-region>"
ACCESS_KEY = "<your-access-key>"
SECRET_KEY = "<your-secret-key>"
TABLE_NAME = "usertable"

def create_dynamodb_client():
    return boto3.client(
        'dynamodb',
        endpoint_url=ENDPOINT_URL,
        region_name=REGION,
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY
    )

def create_table(client):
    try:
        client.create_table(
            TableName=TABLE_NAME,
            KeySchema=[{'AttributeName': 'userid', 'KeyType': 'HASH'}],
            AttributeDefinitions=[{'AttributeName': 'userid', 'AttributeType': 'S'}],
            BillingMode='PAY_PER_REQUEST'
        )
        waiter = client.get_waiter('table_exists')
        waiter.wait(TableName=TABLE_NAME, WaiterConfig={'Delay': 2, 'MaxAttempts': 60})
        print(f"表 {TABLE_NAME} 创建成功")
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceInUseException':
            print(f"表 {TABLE_NAME} 已存在,跳过创建")
        else:
            raise

def insert_test_data(client):
    items = [
        {'userid': 'user1', 'event_time': '2025-01-01T10:00:00Z', 'product_category': 'Clothing', 'duration': 10},
        {'userid': 'user2', 'event_time': '2025-01-02T11:30:00Z', 'product_category': 'Electronics', 'duration': 5},
        {'userid': 'user3', 'event_time': '2025-01-03T09:15:00Z', 'product_category': 'Clothing', 'duration': 20},
        {'userid': 'user4', 'event_time': '2025-01-04T20:00:00Z', 'product_category': 'Pet Products', 'duration': 8},
        {'userid': 'user5', 'event_time': '2025-01-05T08:45:00Z', 'product_category': 'Electronics', 'duration': 12},
    ]
    for idx, d in enumerate(items, 1):
        client.put_item(TableName=TABLE_NAME, Item={
            'userid': {'S': d['userid']},
            'event_time': {'S': d['event_time']},
            'product_category': {'S': d['product_category']},
            'duration': {'N': str(d['duration'])}
        })
        print(f"插入数据 {idx}: {d['userid']}, {d['product_category']}, {d['duration']} 分钟")

def main():
    client = create_dynamodb_client()
    create_table(client)
    insert_test_data(client)
    resp = client.scan(TableName=TABLE_NAME)
    print(f"验证完成,共 {len(resp.get('Items', []))} 条数据")

if __name__ == "__main__":
    main()

数据插入完成后,通过 psql 或其他 PostgreSQL 客户端连接 PolarDB,继续后续步骤。

步骤二:在 PolarDB 中创建列存索引

说明

开启列存索引有版本和内核参数要求,详见。性能调优可参考。

查看底层表结构

所有 DynamoDB 表存储在 polardb_internal_dynamodb 数据库中,Schema 名称与 DynamoDB 账号名一致。以账号名 dynamodb 为例,连接 polardb_internal_dynamodb 数据库后执行以下 SQL 查看表数据:

SELECT * FROM dynamodb.usertable LIMIT 5;

执行结果:

查看表数据

其中 polardb_document 列是一个 JSONB 类型字段,存储了每条 DynamoDB 记录的完整业务数据。

创建生成列

为了在列存索引中加速按时间过滤和品类分组的聚合查询,需要从 polardb_document 这个 JSONB 列中提取查询所需的属性,创建为持久化生成列(Generated Column):

ALTER TABLE dynamodb.usertable
    ADD COLUMN col_event_time TEXT GENERATED ALWAYS AS
        (polardb_document -> 'event_time' ->> 'S') STORED,
    ADD COLUMN col_product_category TEXT GENERATED ALWAYS AS
        (polardb_document -> 'product_category' ->> 'S') STORED,
    ADD COLUMN col_duration BIGINT GENERATED ALWAYS AS
        ((polardb_document -> 'duration' ->> 'N')::bigint) STORED;

创建完成后再次查看表信息:

生成列创建后

生成列会在数据写入时自动从 JSONB 中提取对应值并持久化存储,后续查询可直接使用这些列。

创建列存索引

基于分区键和新增的生成列创建 CSI(Column Store Index):

-- 也可以使用 CREATE INDEX CONCURRENTLY 并发创建索引,避免锁表
CREATE INDEX csi_usertable
ON dynamodb.usertable
USING CSI (
    userid,
    col_event_time,
    col_product_category,
    col_duration
);

可通过 pg_indexes 视图确认索引已创建:

SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'usertable' AND schemaname = 'dynamodb';

执行结果:

索引列表

步骤三:使用列存索引统计数据

开启列存索引参数

执行分析查询前,确保列存索引相关参数已开启:

-- 允许查询使用列存索引
SET polar_csi.enable_query = on;
SHOW polar_csi.enable_query;

-- 实验和分析场景下,可将代价阈值设为 0,更容易触发 CSI
SET polar_csi.cost_threshold = 0;
SHOW polar_csi.cost_threshold;

执行统计查询

统计最近一年内各商品品类的总停留时长(分钟)。先通过 EXPLAIN 查看执行计划,确认查询是否已由 IMCI 加速:

EXPLAIN
SELECT
    col_product_category AS product_category,
    SUM(col_duration)    AS total_duration_minutes
FROM dynamodb.usertable
WHERE col_event_time::timestamptz >= now() - interval '1 year'
GROUP BY product_category
ORDER BY product_category;

执行计划

EXPLAIN 输出中,如果出现 CSI Executor,表示查询已由列存索引加速。去掉 EXPLAIN 后执行,获得实际统计结果:

SELECT
    col_product_category AS product_category,
    SUM(col_duration)    AS total_duration_minutes
FROM dynamodb.usertable
WHERE col_event_time::timestamptz >= now() - interval '1 year'
GROUP BY product_category
ORDER BY product_category;

统计结果

以本文的测试数据为例,预期统计结果如下:

product_category

total_duration_minutes

Clothing

30

Electronics

17

Pet Products

8

总结

PolarDB PostgreSQL DynamoDB兼容模式结合列存索引,既保留了 DynamoDB API 的易用性,又可以使用 PostgreSQL SQL 进行复杂分析和聚合操作,利用 IMCI 大幅提升分析查询性能。这种架构适合需要同时满足 OLTP 和 OLAP 场景的业务需求。