本文以商品品类停留时长统计为例,演示如何在PolarDB PostgreSQL版 DynamoDB兼容模式中,利用列存索引(IMCI)将分析查询下推到数据库引擎层面,避免在业务侧拉取全量数据后再做聚合。
示例场景
假设业务中有一个名为 usertable 的 DynamoDB 表,记录用户在不同商品品类页面的停留信息。每条记录的业务文档(polardb_document)结构如下:
{
"userid": {"S": "user1"},
"event_time": {"S": "2026-01-01T10:00:00Z"},
"product_category": {"S": "Clothing"},
"duration": {"N": "10"}
}
字段说明:
-
userid:用户 ID(DynamoDB 分区键)。 -
event_time:访问开始时间。 -
product_category:商品品类。 -
duration:停留时长(分钟)。
业务需求
统计最近一年内,每个商品品类所有用户的停留总时长。
在传统 DynamoDB 中,服务端无法完成聚合运算,需要通过过滤表达式获取数据后在业务代码中进行聚合;数据量大时延迟和读容量消耗显著上升。
PolarDB PostgreSQL版 DynamoDB兼容模式结合,可以在 PostgreSQL 引擎层面直接完成高性能分析查询,无需将数据拉取到业务侧。
步骤一:创建表并预置数据
以下 Python 代码通过 DynamoDB API 创建 usertable 表并插入 5 条测试数据。连接配置中的 ENDPOINT_URL 需替换为 DynamoDB 兼容访问地址,具体获取方式参见。
#!/usr/bin/env python3
"""
创建 usertable 并插入测试数据
"""
import boto3
from botocore.exceptions import ClientError
# DynamoDB 连接配置
ENDPOINT_URL = "<your-endpoint_url>"
REGION = "<your-region>"
ACCESS_KEY = "<your-access-key>"
SECRET_KEY = "<your-secret-key>"
TABLE_NAME = "usertable"
def create_dynamodb_client():
return boto3.client(
'dynamodb',
endpoint_url=ENDPOINT_URL,
region_name=REGION,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)
def create_table(client):
try:
client.create_table(
TableName=TABLE_NAME,
KeySchema=[{'AttributeName': 'userid', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'userid', 'AttributeType': 'S'}],
BillingMode='PAY_PER_REQUEST'
)
waiter = client.get_waiter('table_exists')
waiter.wait(TableName=TABLE_NAME, WaiterConfig={'Delay': 2, 'MaxAttempts': 60})
print(f"表 {TABLE_NAME} 创建成功")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceInUseException':
print(f"表 {TABLE_NAME} 已存在,跳过创建")
else:
raise
def insert_test_data(client):
items = [
{'userid': 'user1', 'event_time': '2025-01-01T10:00:00Z', 'product_category': 'Clothing', 'duration': 10},
{'userid': 'user2', 'event_time': '2025-01-02T11:30:00Z', 'product_category': 'Electronics', 'duration': 5},
{'userid': 'user3', 'event_time': '2025-01-03T09:15:00Z', 'product_category': 'Clothing', 'duration': 20},
{'userid': 'user4', 'event_time': '2025-01-04T20:00:00Z', 'product_category': 'Pet Products', 'duration': 8},
{'userid': 'user5', 'event_time': '2025-01-05T08:45:00Z', 'product_category': 'Electronics', 'duration': 12},
]
for idx, d in enumerate(items, 1):
client.put_item(TableName=TABLE_NAME, Item={
'userid': {'S': d['userid']},
'event_time': {'S': d['event_time']},
'product_category': {'S': d['product_category']},
'duration': {'N': str(d['duration'])}
})
print(f"插入数据 {idx}: {d['userid']}, {d['product_category']}, {d['duration']} 分钟")
def main():
client = create_dynamodb_client()
create_table(client)
insert_test_data(client)
resp = client.scan(TableName=TABLE_NAME)
print(f"验证完成,共 {len(resp.get('Items', []))} 条数据")
if __name__ == "__main__":
main()
数据插入完成后,通过 psql 或其他 PostgreSQL 客户端连接 PolarDB,继续后续步骤。
步骤二:在 PolarDB 中创建列存索引
开启列存索引有版本和内核参数要求,详见。性能调优可参考。
查看底层表结构
所有 DynamoDB 表存储在 polardb_internal_dynamodb 数据库中,Schema 名称与 DynamoDB 账号名一致。以账号名 dynamodb 为例,连接 polardb_internal_dynamodb 数据库后执行以下 SQL 查看表数据:
SELECT * FROM dynamodb.usertable LIMIT 5;
执行结果:
polardb_internal_dynamodb=# select * from dynamodb.usertable limit 5;
userid | polardb_document
--------+------------------------------------------------------------------------------------------------------------------------------
user1 | {"userid": {"S": "user1"}, "duration": {"N": "10"}, "event_time": {"S": "2026-01-01T10:00:00Z"}, "product_category": {"S": "Clothing"}}
user2 | {"userid": {"S": "user2"}, "duration": {"N": "5"}, "event_time": {"S": "2026-01-02T11:30:00Z"}, "product_category": {"S": "Electronics"}}
user3 | {"userid": {"S": "user3"}, "duration": {"N": "20"}, "event_time": {"S": "2026-01-03T09:15:00Z"}, "product_category": {"S": "Clothing"}}
user4 | {"userid": {"S": "user4"}, "duration": {"N": "8"}, "event_time": {"S": "2026-01-04T20:00:00Z"}, "product_category": {"S": "Pet Products"}}
user5 | {"userid": {"S": "user5"}, "duration": {"N": "12"}, "event_time": {"S": "2026-01-05T08:45:00Z"}, "product_category": {"S": "Electronics"}}
(5 rows)
其中 polardb_document 列是一个 JSONB 类型字段,存储了每条 DynamoDB 记录的完整业务数据。
创建生成列
为了在列存索引中加速按时间过滤和品类分组的聚合查询,需要从 polardb_document 这个 JSONB 列中提取查询所需的属性,创建为持久化生成列(Generated Column):
ALTER TABLE dynamodb.usertable
ADD COLUMN col_event_time TEXT GENERATED ALWAYS AS
(polardb_document -> 'event_time' ->> 'S') STORED,
ADD COLUMN col_product_category TEXT GENERATED ALWAYS AS
(polardb_document -> 'product_category' ->> 'S') STORED,
ADD COLUMN col_duration BIGINT GENERATED ALWAYS AS
((polardb_document -> 'duration' ->> 'N')::bigint) STORED;
创建完成后再次查看表信息:
polardb_internal_dynamodb=# select * from dynamodb.usertable limit 5;
userid | polardb_document | col_event_time | col_product_category | col_duration
--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----------------------+--------------
user1 | {"userid": {"S": "user1"}, "duration": {"N": "10"}, "event_time": {"S": "2026-01-01T10:00:00Z"}, "product_category": {"S": "Clothing"}} | 2026-01-01T10:00:00Z | Clothing | 10
user2 | {"userid": {"S": "user2"}, "duration": {"N": "5"}, "event_time": {"S": "2026-01-02T11:30:00Z"}, "product_category": {"S": "Electronics"}} | 2026-01-02T11:30:00Z | Electronics | 5
user3 | {"userid": {"S": "user3"}, "duration": {"N": "20"}, "event_time": {"S": "2026-01-03T09:15:00Z"}, "product_category": {"S": "Clothing"}} | 2026-01-03T09:15:00Z | Clothing | 20
user4 | {"userid": {"S": "user4"}, "duration": {"N": "8"}, "event_time": {"S": "2026-01-04T20:00:00Z"}, "product_category": {"S": "Pet Products"}} | 2026-01-04T20:00:00Z | Pet Products | 8
user5 | {"userid": {"S": "user5"}, "duration": {"N": "12"}, "event_time": {"S": "2026-01-05T08:45:00Z"}, "product_category": {"S": "Electronics"}} | 2026-01-05T08:45:00Z | Electronics | 12
(5 rows)
生成列会在数据写入时自动从 JSONB 中提取对应值并持久化存储,后续查询可直接使用这些列。
创建列存索引
基于分区键和新增的生成列创建 CSI(Column Store Index):
-- 也可以使用 CREATE INDEX CONCURRENTLY 并发创建索引,避免锁表
CREATE INDEX csi_usertable
ON dynamodb.usertable
USING CSI (
userid,
col_event_time,
col_product_category,
col_duration
);
可通过 pg_indexes 视图确认索引已创建:
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'usertable' AND schemaname = 'dynamodb';
执行结果:
indexname | indexdef
----------------+-----------------------------------------------------------------------------------------------
usertable_pkey | CREATE UNIQUE INDEX usertable_pkey ON dynamodb.usertable USING btree (userid)
csi_usertable | CREATE INDEX csi_usertable ON dynamodb.usertable USING csi (userid, col_event_time, col_product_category, col_duration)
(2 rows)
步骤三:使用列存索引统计数据
开启列存索引参数
执行分析查询前,确保列存索引相关参数已开启:
-- 允许查询使用列存索引
SET polar_csi.enable_query = on;
SHOW polar_csi.enable_query;
-- 实验和分析场景下,可将代价阈值设为 0,更容易触发 CSI
SET polar_csi.cost_threshold = 0;
SHOW polar_csi.cost_threshold;
执行统计查询
统计最近一年内各商品品类的总停留时长(分钟)。先通过 EXPLAIN 查看执行计划,确认查询是否已由 IMCI 加速:
EXPLAIN
SELECT
col_product_category AS product_category,
SUM(col_duration) AS total_duration_minutes
FROM dynamodb.usertable
WHERE col_event_time::timestamptz >= now() - interval '1 year'
GROUP BY product_category
ORDER BY product_category;
QUERY PLAN
──────────────────────────────────
CSI Executor:
┌─────────────────────────────────┐
│ PROJECTION │
│ ────────────────── │
│ __internal_decompress_strin │
│ g(#0) │
│ #1 │
│ │
│ ~0 Rows │
└─────────────┬───────────────────┘
│
┌─────────────────────────────────┐
│ ORDER_BY │
│ ────────────────── │
│ usertable │
│ .col_product_category ASC │
└─────────────┬───────────────────┘
│
┌─────────────────────────────────┐
│ PROJECTION │
│ ────────────────── │
│ __internal_compress_string_ │
│ uhugeint(#0) │
└─────────────┬───────────────────┘
│
...
在 EXPLAIN 输出中,如果出现 CSI Executor,表示查询已由列存索引加速。去掉 EXPLAIN 后执行,获得实际统计结果:
SELECT
col_product_category AS product_category,
SUM(col_duration) AS total_duration_minutes
FROM dynamodb.usertable
WHERE col_event_time::timestamptz >= now() - interval '1 year'
GROUP BY product_category
ORDER BY product_category;
以本文的测试数据为例,预期统计结果如下:
|
product_category |
total_duration_minutes |
|
Clothing |
30 |
|
Electronics |
17 |
|
Pet Products |
8 |
总结
PolarDB PostgreSQL版 DynamoDB兼容模式结合列存索引,既保留了 DynamoDB API 的易用性,又可以使用 PostgreSQL SQL 进行复杂分析和聚合操作,利用 IMCI 大幅提升分析查询性能。这种架构适合需要同时满足 OLTP 和 OLAP 场景的业务需求。