本文以商品品类停留时长统计为例,演示如何在PolarDB PostgreSQL版 DynamoDB兼容模式中,利用列存索引(IMCI)将分析查询下推到数据库引擎层面,避免在业务侧拉取全量数据后再做聚合。
示例场景
假设业务中有一个名为 usertable 的 DynamoDB 表,记录用户在不同商品品类页面的停留信息。每条记录的业务文档(polardb_document)结构如下:
{
"userid": {"S": "user1"},
"event_time": {"S": "2026-01-01T10:00:00Z"},
"product_category": {"S": "Clothing"},
"duration": {"N": "10"}
}字段说明:
userid:用户 ID(DynamoDB 分区键)。event_time:访问开始时间。product_category:商品品类。duration:停留时长(分钟)。
业务需求
统计最近一年内,每个商品品类所有用户的停留总时长。
在传统 DynamoDB 中,服务端无法完成聚合运算,需要通过过滤表达式获取数据后在业务代码中进行聚合;数据量大时延迟和读容量消耗显著上升。
PolarDB PostgreSQL版 DynamoDB兼容模式结合,可以在 PostgreSQL 引擎层面直接完成高性能分析查询,无需将数据拉取到业务侧。
步骤一:创建表并预置数据
以下 Python 代码通过 DynamoDB API 创建 usertable 表并插入 5 条测试数据。连接配置中的 ENDPOINT_URL 需替换为 DynamoDB 兼容访问地址,具体获取方式参见。
#!/usr/bin/env python3
"""
创建 usertable 并插入测试数据
"""
import boto3
from botocore.exceptions import ClientError
# DynamoDB 连接配置
ENDPOINT_URL = "<your-endpoint_url>"
REGION = "<your-region>"
ACCESS_KEY = "<your-access-key>"
SECRET_KEY = "<your-secret-key>"
TABLE_NAME = "usertable"
def create_dynamodb_client():
return boto3.client(
'dynamodb',
endpoint_url=ENDPOINT_URL,
region_name=REGION,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)
def create_table(client):
try:
client.create_table(
TableName=TABLE_NAME,
KeySchema=[{'AttributeName': 'userid', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'userid', 'AttributeType': 'S'}],
BillingMode='PAY_PER_REQUEST'
)
waiter = client.get_waiter('table_exists')
waiter.wait(TableName=TABLE_NAME, WaiterConfig={'Delay': 2, 'MaxAttempts': 60})
print(f"表 {TABLE_NAME} 创建成功")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceInUseException':
print(f"表 {TABLE_NAME} 已存在,跳过创建")
else:
raise
def insert_test_data(client):
items = [
{'userid': 'user1', 'event_time': '2025-01-01T10:00:00Z', 'product_category': 'Clothing', 'duration': 10},
{'userid': 'user2', 'event_time': '2025-01-02T11:30:00Z', 'product_category': 'Electronics', 'duration': 5},
{'userid': 'user3', 'event_time': '2025-01-03T09:15:00Z', 'product_category': 'Clothing', 'duration': 20},
{'userid': 'user4', 'event_time': '2025-01-04T20:00:00Z', 'product_category': 'Pet Products', 'duration': 8},
{'userid': 'user5', 'event_time': '2025-01-05T08:45:00Z', 'product_category': 'Electronics', 'duration': 12},
]
for idx, d in enumerate(items, 1):
client.put_item(TableName=TABLE_NAME, Item={
'userid': {'S': d['userid']},
'event_time': {'S': d['event_time']},
'product_category': {'S': d['product_category']},
'duration': {'N': str(d['duration'])}
})
print(f"插入数据 {idx}: {d['userid']}, {d['product_category']}, {d['duration']} 分钟")
def main():
client = create_dynamodb_client()
create_table(client)
insert_test_data(client)
resp = client.scan(TableName=TABLE_NAME)
print(f"验证完成,共 {len(resp.get('Items', []))} 条数据")
if __name__ == "__main__":
main()数据插入完成后,通过 psql 或其他 PostgreSQL 客户端连接 PolarDB,继续后续步骤。
步骤二:在 PolarDB 中创建列存索引
开启列存索引有版本和内核参数要求,详见。性能调优可参考。
查看底层表结构
所有 DynamoDB 表存储在 polardb_internal_dynamodb 数据库中,Schema 名称与 DynamoDB 账号名一致。以账号名 dynamodb 为例,连接 polardb_internal_dynamodb 数据库后执行以下 SQL 查看表数据:
SELECT * FROM dynamodb.usertable LIMIT 5;执行结果:

其中 polardb_document 列是一个 JSONB 类型字段,存储了每条 DynamoDB 记录的完整业务数据。
创建生成列
为了在列存索引中加速按时间过滤和品类分组的聚合查询,需要从 polardb_document 这个 JSONB 列中提取查询所需的属性,创建为持久化生成列(Generated Column):
ALTER TABLE dynamodb.usertable
ADD COLUMN col_event_time TEXT GENERATED ALWAYS AS
(polardb_document -> 'event_time' ->> 'S') STORED,
ADD COLUMN col_product_category TEXT GENERATED ALWAYS AS
(polardb_document -> 'product_category' ->> 'S') STORED,
ADD COLUMN col_duration BIGINT GENERATED ALWAYS AS
((polardb_document -> 'duration' ->> 'N')::bigint) STORED;创建完成后再次查看表信息:

生成列会在数据写入时自动从 JSONB 中提取对应值并持久化存储,后续查询可直接使用这些列。
创建列存索引
基于分区键和新增的生成列创建 CSI(Column Store Index):
-- 也可以使用 CREATE INDEX CONCURRENTLY 并发创建索引,避免锁表
CREATE INDEX csi_usertable
ON dynamodb.usertable
USING CSI (
userid,
col_event_time,
col_product_category,
col_duration
);可通过 pg_indexes 视图确认索引已创建:
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'usertable' AND schemaname = 'dynamodb';执行结果:

步骤三:使用列存索引统计数据
开启列存索引参数
执行分析查询前,确保列存索引相关参数已开启:
-- 允许查询使用列存索引
SET polar_csi.enable_query = on;
SHOW polar_csi.enable_query;
-- 实验和分析场景下,可将代价阈值设为 0,更容易触发 CSI
SET polar_csi.cost_threshold = 0;
SHOW polar_csi.cost_threshold;执行统计查询
统计最近一年内各商品品类的总停留时长(分钟)。先通过 EXPLAIN 查看执行计划,确认查询是否已由 IMCI 加速:
EXPLAIN
SELECT
col_product_category AS product_category,
SUM(col_duration) AS total_duration_minutes
FROM dynamodb.usertable
WHERE col_event_time::timestamptz >= now() - interval '1 year'
GROUP BY product_category
ORDER BY product_category;
在 EXPLAIN 输出中,如果出现 CSI Executor,表示查询已由列存索引加速。去掉 EXPLAIN 后执行,获得实际统计结果:
SELECT
col_product_category AS product_category,
SUM(col_duration) AS total_duration_minutes
FROM dynamodb.usertable
WHERE col_event_time::timestamptz >= now() - interval '1 year'
GROUP BY product_category
ORDER BY product_category;
以本文的测试数据为例,预期统计结果如下:
product_category | total_duration_minutes |
Clothing | 30 |
Electronics | 17 |
Pet Products | 8 |
总结
PolarDB PostgreSQL版 DynamoDB兼容模式结合列存索引,既保留了 DynamoDB API 的易用性,又可以使用 PostgreSQL SQL 进行复杂分析和聚合操作,利用 IMCI 大幅提升分析查询性能。这种架构适合需要同时满足 OLTP 和 OLAP 场景的业务需求。