通过表格存储的多元索引和多条件组合查询能力,搭建支持亿量级数据的电商订单管理系统。
方案概述
电商、金融、物流等行业的订单数据量随业务增长从百万级膨胀到亿量级,传统关系型数据库面临存储瓶颈和查询性能下降。核心挑战:海量订单数据需按用户、时间范围、金额范围等多维度组合检索,同时按商品类目统计销售数据。
表格存储单表支持 PB 级数据存储和千万级 TPS。多元索引支持 KEYWORD 精确查询、DOUBLE/LONG 范围查询、TEXT 分词模糊查询,结合 BoolQuery 实现多条件组合检索,GroupBy 实现统计聚合分析。
方案对比
传统订单系统基于 MySQL 分库分表 + Elasticsearch 构建,在亿量级场景下面临以下挑战。表格存储提供更简洁的替代方案。
|
对比维度 |
传统方案(MySQL 分库分表 + ES) |
表格存储方案 |
|
存储规模 |
单库亿级需分库分表,运维复杂 |
单表 PB 级,自动分片 |
|
多维查询 |
需额外维护 ES,数据双写一致性难保障 |
多元索引内置,数据自动同步 |
|
弹性扩容 |
分片迁移、连接数瓶颈 |
Serverless 自动扩缩 |
|
运维成本 |
MySQL + ES + 同步链路,三套系统独立运维 |
单服务按量付费,免运维 |
新建订单系统推荐直接使用表格存储。既有 MySQL 订单系统可通过同步MySQL数据将数据实时同步到表格存储,由表格存储承担多维检索和统计分析,MySQL 专注在线事务处理。
方案设计
本方案使用一张宽表存储订单数据(schema-free,属性列无需预定义),通过多元索引实现多维检索和统计。数据表和索引的字段设计如下。
|
字段类型 |
字段 |
数据类型 |
索引字段类型 |
说明 |
|
主键列 |
_id |
String |
— |
订单编号的 MD5 哈希值,用于打散数据避免写入热点。 |
|
属性列 |
orderId |
String |
KEYWORD |
订单编号,用于精确查询。 |
|
userId |
String |
KEYWORD |
用户编号,用于按用户筛选订单。 |
|
|
productName |
String |
TEXT |
商品名称,使用最大语义分词支持模糊查询。 |
|
|
brand |
String |
KEYWORD |
商品品牌。 |
|
|
category |
String |
KEYWORD |
商品类目(手机、电脑、耳机、平板等),用于分组统计。 |
|
|
price |
Double |
DOUBLE |
商品单价(元)。 |
|
|
quantity |
Integer |
LONG |
购买数量。 |
|
|
totalPrice |
Double |
DOUBLE |
订单总金额(元),用于金额范围查询和统计。 |
|
|
orderTime |
Integer |
LONG |
下单时间戳(毫秒),用于时间范围查询和排序。 |
|
|
status |
String |
KEYWORD |
订单状态(已付款、已发货、已完成、已取消)。 |
方案实现
开始前,开通表格存储服务并创建实例(如已有可用实例,跳过此步骤)。
以下示例使用 Java 和 Python SDK。开始前,根据需要安装和配置Java SDK或Python SDK。
步骤一:创建数据表和多元索引
创建数据表时只需指定主键列,属性列无需预定义。数据表创建完成后,为其创建多元索引,索引字段类型包含 KEYWORD(精确查询)、TEXT(分词模糊查询)、DOUBLE(范围查询)和 LONG(范围查询和排序)。
Java
// 创建数据表
private static void createTable(SyncClient client) {
TableMeta tableMeta = new TableMeta("order_contract");
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("_id", PrimaryKeyType.STRING));
TableOptions tableOptions = new TableOptions();
tableOptions.setMaxVersions(1);
tableOptions.setTimeToLive(-1);
client.createTable(new CreateTableRequest(tableMeta, tableOptions));
}
// 创建多元索引
private static void createSearchIndex(SyncClient client) {
CreateSearchIndexRequest request = new CreateSearchIndexRequest();
request.setTableName("order_contract");
request.setIndexName("order_index");
IndexSchema indexSchema = new IndexSchema();
indexSchema.setFieldSchemas(Arrays.asList(
new FieldSchema("orderId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("userId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("productName", FieldType.TEXT).setIndex(true)
.setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("brand", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("category", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("price", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("quantity", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("totalPrice", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("orderTime", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("status", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true)
));
request.setIndexSchema(indexSchema);
client.createSearchIndex(request);
}
Python
def create_table_and_index(client):
# 创建数据表
table_meta = tablestore.TableMeta("order_contract", [("_id", "STRING")])
table_options = tablestore.TableOptions(time_to_live=-1, max_version=1)
reserved = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
client.create_table(table_meta, table_options, reserved)
# 创建多元索引
fields = [
tablestore.FieldSchema("orderId", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("userId", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("productName", tablestore.FieldType.TEXT,
index=True, analyzer="max_word"),
tablestore.FieldSchema("brand", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("price", tablestore.FieldType.DOUBLE,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("quantity", tablestore.FieldType.LONG,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("totalPrice", tablestore.FieldType.DOUBLE,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("orderTime", tablestore.FieldType.LONG,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("status", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
]
client.create_search_index("order_contract", "order_index",
tablestore.SearchIndexMeta(fields))
步骤二:写入测试数据
通过 BatchWriteRow 接口批量写入 1 万条订单数据。每条订单包含随机生成的用户编号、商品信息(名称、品牌、类目、单价)、购买数量、总金额、下单时间和订单状态。主键 _id 使用订单编号的 MD5 哈希值,打散数据以避免写入热点。BatchWriteRow 单次最多写入 200 行。
Java
private static void writeTestData(SyncClient client) throws Exception {
Random rand = new Random(42);
long start = System.currentTimeMillis();
int written = 0;
long baseTime = 1704067200000L; // 2024-01-01 UTC
long timeSpan = 2L * 365 * 24 * 3600 * 1000; // 2 年
String[] PRODUCTS = {"iPhone 15", "MacBook Pro", "AirPods", "iPad Air",
"Galaxy S24", "Pixel 8", "ThinkPad X1", "Surface Pro",
"小米14", "华为Mate60"};
String[] BRANDS = {"Apple", "Apple", "Apple", "Apple",
"Samsung", "Google", "Lenovo", "Microsoft",
"Xiaomi", "Huawei"};
String[] CATEGORIES = {"手机", "电脑", "耳机", "平板",
"手机", "手机", "电脑", "电脑", "手机", "手机"};
double[] PRICES = {7999, 14999, 1799, 4599, 6999, 4999, 9999, 8999, 3999, 6999};
String[] USERS = {"u00001", "u00002", "u00003", "u00004", "u00005",
"u00006", "u00007", "u00008", "u00009", "u00010"};
String[] STATUSES = {"已付款", "已发货", "已完成", "已取消"};
while (written < 10000) {
BatchWriteRowRequest batch = new BatchWriteRowRequest();
int batchSize = Math.min(200, 10000 - written);
for (int i = 0; i < batchSize; i++) {
int idx = written + i;
String orderId = String.format("ORD%08d", idx);
String id = md5(orderId);
String userId = USERS[rand.nextInt(USERS.length)];
int prodIdx = rand.nextInt(PRODUCTS.length);
int quantity = 1 + rand.nextInt(5);
double totalPrice = Math.round(PRICES[prodIdx] * quantity * 100.0) / 100.0;
long orderTime = baseTime + (long)(rand.nextDouble() * timeSpan);
PrimaryKeyBuilder pkb = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pkb.addPrimaryKeyColumn("_id", PrimaryKeyValue.fromString(id));
RowPutChange row = new RowPutChange("order_contract", pkb.build());
row.addColumn("orderId", ColumnValue.fromString(orderId));
row.addColumn("userId", ColumnValue.fromString(userId));
row.addColumn("productName", ColumnValue.fromString(PRODUCTS[prodIdx]));
row.addColumn("brand", ColumnValue.fromString(BRANDS[prodIdx]));
row.addColumn("category", ColumnValue.fromString(CATEGORIES[prodIdx]));
row.addColumn("price", ColumnValue.fromDouble(PRICES[prodIdx]));
row.addColumn("quantity", ColumnValue.fromLong(quantity));
row.addColumn("totalPrice", ColumnValue.fromDouble(totalPrice));
row.addColumn("orderTime", ColumnValue.fromLong(orderTime));
row.addColumn("status", ColumnValue.fromString(
STATUSES[rand.nextInt(STATUSES.length)]));
batch.addRowChange(row);
}
client.batchWriteRow(batch);
written += batchSize;
}
System.out.println("Written 10000 rows in " + (System.currentTimeMillis() - start) + "ms");
}
Python
import hashlib, random
def write_test_data(client):
rand = random.Random(42)
base_time = 1704067200000 # 2024-01-01 UTC
time_span = 2 * 365 * 24 * 3600 * 1000 # 2 年
PRODUCTS = ["iPhone 15", "MacBook Pro", "AirPods", "iPad Air",
"Galaxy S24", "Pixel 8", "ThinkPad X1", "Surface Pro",
"小米14", "华为Mate60"]
BRANDS = ["Apple", "Apple", "Apple", "Apple",
"Samsung", "Google", "Lenovo", "Microsoft", "Xiaomi", "Huawei"]
CATEGORIES = ["手机", "电脑", "耳机", "平板",
"手机", "手机", "电脑", "电脑", "手机", "手机"]
PRICES = [7999, 14999, 1799, 4599, 6999, 4999, 9999, 8999, 3999, 6999]
USERS = [f"u{i:05d}" for i in range(1, 11)]
STATUSES = ["已付款", "已发货", "已完成", "已取消"]
written = 0
while written < 10000:
batch_size = min(200, 10000 - written)
put_rows = []
for i in range(batch_size):
idx = written + i
order_id = f"ORD{idx:08d}"
_id = hashlib.md5(order_id.encode()).hexdigest()
user_id = rand.choice(USERS)
prod_idx = rand.randint(0, len(PRODUCTS) - 1)
quantity = 1 + rand.randint(0, 4)
total_price = round(PRICES[prod_idx] * quantity, 2)
order_time = base_time + int(rand.random() * time_span)
pk = [("_id", _id)]
cols = [
("orderId", order_id), ("userId", user_id),
("productName", PRODUCTS[prod_idx]),
("brand", BRANDS[prod_idx]),
("category", CATEGORIES[prod_idx]),
("price", float(PRICES[prod_idx])),
("quantity", quantity),
("totalPrice", float(total_price)),
("orderTime", order_time),
("status", rand.choice(STATUSES)),
]
row = tablestore.Row(pk, cols)
put_rows.append(tablestore.PutRowItem(
row, tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)))
request = tablestore.BatchWriteRowRequest()
request.add(tablestore.TableInBatchWriteRowItem("order_contract", put_rows))
client.batch_write_row(request)
written += batch_size
步骤三:检索数据
数据写入后,等待多元索引数据同步完成(通常数秒至数分钟,取决于数据量)。以下示例演示三种典型检索场景:
场景一:按用户查询订单
使用 TermQuery 精确匹配用户编号,查询该用户的所有订单,按下单时间倒序排列。
Java
// 查询用户 u00001 的所有订单,按下单时间倒序
TermQuery userQuery = new TermQuery();
userQuery.setFieldName("userId");
userQuery.setTerm(ColumnValue.fromString("u00001"));
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(userQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
new FieldSort("orderTime", SortOrder.DESC))));
SearchRequest request = new SearchRequest("order_contract", "order_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
System.out.println(row);
}
Python
# 查询用户 u00001 的所有订单,按下单时间倒序
result = client.search("order_contract", "order_index",
tablestore.SearchQuery(
tablestore.TermQuery("userId", "u00001"),
get_total_count=True, limit=5,
sort=tablestore.Sort(sorters=[
tablestore.FieldSort("orderTime", tablestore.SortOrder.DESC)])),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))
print(f"Total hits: {result.total_count}")
for row in result.rows:
cols = {c[0]: c[1] for c in row[1]}
print(f"orderId={cols['orderId']} | product={cols['productName']}"
f" | totalPrice={cols['totalPrice']} | status={cols['status']}")
场景二:多条件组合查询
使用 BoolQuery 组合 TermQuery(用户筛选)、RangeQuery(金额范围)和 RangeQuery(时间范围),实现"某用户在指定时间段内金额超过 5000 元的订单"查询。
Java
// 用户筛选:userId = "u00001"
TermQuery userQuery = new TermQuery();
userQuery.setFieldName("userId");
userQuery.setTerm(ColumnValue.fromString("u00001"));
// 金额范围:totalPrice >= 5000
RangeQuery priceQuery = new RangeQuery();
priceQuery.setFieldName("totalPrice");
priceQuery.greaterThanOrEqual(ColumnValue.fromDouble(5000));
// 时间范围:2024 年全年
RangeQuery timeQuery = new RangeQuery();
timeQuery.setFieldName("orderTime");
timeQuery.greaterThanOrEqual(ColumnValue.fromLong(1704067200000L)); // 2024-01-01
timeQuery.lessThan(ColumnValue.fromLong(1735689600000L)); // 2025-01-01
// BoolQuery 组合三个条件
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(userQuery, priceQuery, timeQuery));
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
new FieldSort("totalPrice", SortOrder.DESC))));
SearchRequest request = new SearchRequest("order_contract", "order_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
System.out.println(row);
}
Python
# 用户筛选
user_query = tablestore.TermQuery("userId", "u00001")
# 金额范围:>= 5000
price_query = tablestore.RangeQuery("totalPrice",
range_from=5000, include_lower=True)
# 时间范围:2024 年全年
time_query = tablestore.RangeQuery("orderTime",
range_from=1704067200000, range_to=1735689600000,
include_lower=True, include_upper=False)
# BoolQuery 组合三个条件
bool_query = tablestore.BoolQuery(
must_queries=[user_query, price_query, time_query])
result = client.search("order_contract", "order_index",
tablestore.SearchQuery(bool_query, get_total_count=True, limit=5,
sort=tablestore.Sort(sorters=[
tablestore.FieldSort("totalPrice", tablestore.SortOrder.DESC)])),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))
print(f"Total hits: {result.total_count}")
for row in result.rows:
cols = {c[0]: c[1] for c in row[1]}
print(f"orderId={cols['orderId']} | product={cols['productName']}"
f" | totalPrice={cols['totalPrice']}")
场景三:按商品类目统计
使用 GroupByField 按商品类目分组,统计每种类目的订单数量和总销售额,按总销售额降序排列。
Java
SearchRequest request = SearchRequest.newBuilder()
.tableName("order_contract").indexName("order_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(0)
.addGroupBy(GroupByBuilders.groupByField("orderCategory", "category")
.size(20)
.addSubAggregation(AggregationBuilders.count("orderCount", "orderId"))
.addSubAggregation(AggregationBuilders.sum("totalRevenue", "totalPrice"))
.addGroupBySorter(GroupBySorter.subAggSortInDesc("totalRevenue"))
.build())
.build())
.build();
SearchResponse response = client.search(request);
GroupByFieldResult results = response.getGroupByResults()
.getAsGroupByFieldResult("orderCategory");
for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {
long count = item.getSubAggregationResults()
.getAsCountAggregationResult("orderCount").getValue();
double revenue = item.getSubAggregationResults()
.getAsSumAggregationResult("totalRevenue").getValue();
System.out.println("类别: " + item.getKey()
+ " | 订单数: " + count
+ " | 总销售额: " + String.format("%.2f", revenue));
}
Python
group_by = tablestore.GroupByField("category", size=20,
sub_aggs=[
tablestore.Count("orderId", name="orderCount"),
tablestore.Sum("totalPrice", name="totalRevenue"),
],
group_by_sort=[tablestore.SubAggSort(tablestore.SortOrder.DESC, "totalRevenue")],
name="orderCategory")
result = client.search("order_contract", "order_index",
tablestore.SearchQuery(tablestore.MatchAllQuery(), limit=0,
group_bys=[group_by]),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.NONE))
for item in result.group_by_results[0].items:
aggs = {a.name: a.value for a in item.sub_aggs}
print(f"类别: {item.key} | 订单数: {int(aggs['orderCount'])}"
f" | 总销售额: {aggs['totalRevenue']:.2f}")
资源清理
不再使用本方案中创建的资源时,及时清理以避免不必要的费用。
按以下顺序清理资源:先删除多元索引,再删除数据表。如果实例是为本方案新建的,最后释放实例。
Java
private static void cleanup(SyncClient client) {
// 1. 删除多元索引
DeleteSearchIndexRequest deleteIndexReq = new DeleteSearchIndexRequest();
deleteIndexReq.setTableName("order_contract");
deleteIndexReq.setIndexName("order_index");
client.deleteSearchIndex(deleteIndexReq);
// 2. 删除数据表
client.deleteTable(new DeleteTableRequest("order_contract"));
}
Python
def cleanup(client):
# 1. 删除多元索引
client.delete_search_index("order_contract", "order_index")
# 2. 删除数据表
client.delete_table("order_contract")