搭建亿量级订单系统

更新时间:
复制 MD 格式

通过表格存储的多元索引和多条件组合查询能力,搭建支持亿量级数据的电商订单管理系统。

方案概述

电商、金融、物流等行业的订单数据量随业务增长从百万级膨胀到亿量级,传统关系型数据库面临存储瓶颈和查询性能下降。核心挑战:海量订单数据需按用户、时间范围、金额范围等多维度组合检索,同时按商品类目统计销售数据。

表格存储单表支持 PB 级数据存储和千万级 TPS。多元索引支持 KEYWORD 精确查询、DOUBLE/LONG 范围查询、TEXT 分词模糊查询,结合 BoolQuery 实现多条件组合检索,GroupBy 实现统计聚合分析。

方案对比

传统订单系统基于 MySQL 分库分表 + Elasticsearch 构建,在亿量级场景下面临以下挑战。表格存储提供更简洁的替代方案。

对比维度

传统方案(MySQL 分库分表 + ES)

表格存储方案

存储规模

单库亿级需分库分表,运维复杂

单表 PB 级,自动分片

多维查询

需额外维护 ES,数据双写一致性难保障

多元索引内置,数据自动同步

弹性扩容

分片迁移、连接数瓶颈

Serverless 自动扩缩

运维成本

MySQL + ES + 同步链路,三套系统独立运维

单服务按量付费,免运维

新建订单系统推荐直接使用表格存储。既有 MySQL 订单系统可通过同步MySQL数据将数据实时同步到表格存储,由表格存储承担多维检索和统计分析,MySQL 专注在线事务处理。

方案设计

本方案使用一张宽表存储订单数据(schema-free,属性列无需预定义),通过多元索引实现多维检索和统计。数据表和索引的字段设计如下。

字段类型

字段

数据类型

索引字段类型

说明

主键列

_id

String

订单编号的 MD5 哈希值,用于打散数据避免写入热点。

属性列

orderId

String

KEYWORD

订单编号,用于精确查询。

userId

String

KEYWORD

用户编号,用于按用户筛选订单。

productName

String

TEXT

商品名称,使用最大语义分词支持模糊查询。

brand

String

KEYWORD

商品品牌。

category

String

KEYWORD

商品类目(手机、电脑、耳机、平板等),用于分组统计。

price

Double

DOUBLE

商品单价(元)。

quantity

Integer

LONG

购买数量。

totalPrice

Double

DOUBLE

订单总金额(元),用于金额范围查询和统计。

orderTime

Integer

LONG

下单时间戳(毫秒),用于时间范围查询和排序。

status

String

KEYWORD

订单状态(已付款、已发货、已完成、已取消)。

方案实现

开始前,开通表格存储服务并创建实例(如已有可用实例,跳过此步骤)。

说明

以下示例使用 Java 和 Python SDK。开始前,根据需要安装和配置Java SDKPython SDK

步骤一:创建数据表和多元索引

创建数据表时只需指定主键列,属性列无需预定义。数据表创建完成后,为其创建多元索引,索引字段类型包含 KEYWORD(精确查询)、TEXT(分词模糊查询)、DOUBLE(范围查询)和 LONG(范围查询和排序)。

Java

// 创建数据表
private static void createTable(SyncClient client) {
    TableMeta tableMeta = new TableMeta("order_contract");
    tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("_id", PrimaryKeyType.STRING));

    TableOptions tableOptions = new TableOptions();
    tableOptions.setMaxVersions(1);
    tableOptions.setTimeToLive(-1);

    client.createTable(new CreateTableRequest(tableMeta, tableOptions));
}

// 创建多元索引
private static void createSearchIndex(SyncClient client) {
    CreateSearchIndexRequest request = new CreateSearchIndexRequest();
    request.setTableName("order_contract");
    request.setIndexName("order_index");

    IndexSchema indexSchema = new IndexSchema();
    indexSchema.setFieldSchemas(Arrays.asList(
        new FieldSchema("orderId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("userId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("productName", FieldType.TEXT).setIndex(true)
            .setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("brand", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("category", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("price", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("quantity", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("totalPrice", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("orderTime", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("status", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true)
    ));
    request.setIndexSchema(indexSchema);
    client.createSearchIndex(request);
}

Python

def create_table_and_index(client):
    # 创建数据表
    table_meta = tablestore.TableMeta("order_contract", [("_id", "STRING")])
    table_options = tablestore.TableOptions(time_to_live=-1, max_version=1)
    reserved = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
    client.create_table(table_meta, table_options, reserved)

    # 创建多元索引
    fields = [
        tablestore.FieldSchema("orderId", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("userId", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("productName", tablestore.FieldType.TEXT,
                               index=True, analyzer="max_word"),
        tablestore.FieldSchema("brand", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("price", tablestore.FieldType.DOUBLE,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("quantity", tablestore.FieldType.LONG,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("totalPrice", tablestore.FieldType.DOUBLE,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("orderTime", tablestore.FieldType.LONG,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("status", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
    ]
    client.create_search_index("order_contract", "order_index",
                               tablestore.SearchIndexMeta(fields))

步骤二:写入测试数据

通过 BatchWriteRow 接口批量写入 1 万条订单数据。每条订单包含随机生成的用户编号、商品信息(名称、品牌、类目、单价)、购买数量、总金额、下单时间和订单状态。主键 _id 使用订单编号的 MD5 哈希值,打散数据以避免写入热点。BatchWriteRow 单次最多写入 200 行。

Java

private static void writeTestData(SyncClient client) throws Exception {
    Random rand = new Random(42);
    long start = System.currentTimeMillis();
    int written = 0;
    long baseTime = 1704067200000L; // 2024-01-01 UTC
    long timeSpan = 2L * 365 * 24 * 3600 * 1000; // 2 年

    String[] PRODUCTS = {"iPhone 15", "MacBook Pro", "AirPods", "iPad Air",
                         "Galaxy S24", "Pixel 8", "ThinkPad X1", "Surface Pro",
                         "小米14", "华为Mate60"};
    String[] BRANDS = {"Apple", "Apple", "Apple", "Apple",
                       "Samsung", "Google", "Lenovo", "Microsoft",
                       "Xiaomi", "Huawei"};
    String[] CATEGORIES = {"手机", "电脑", "耳机", "平板",
                           "手机", "手机", "电脑", "电脑", "手机", "手机"};
    double[] PRICES = {7999, 14999, 1799, 4599, 6999, 4999, 9999, 8999, 3999, 6999};
    String[] USERS = {"u00001", "u00002", "u00003", "u00004", "u00005",
                      "u00006", "u00007", "u00008", "u00009", "u00010"};
    String[] STATUSES = {"已付款", "已发货", "已完成", "已取消"};

    while (written < 10000) {
        BatchWriteRowRequest batch = new BatchWriteRowRequest();
        int batchSize = Math.min(200, 10000 - written);

        for (int i = 0; i < batchSize; i++) {
            int idx = written + i;
            String orderId = String.format("ORD%08d", idx);
            String id = md5(orderId);
            String userId = USERS[rand.nextInt(USERS.length)];
            int prodIdx = rand.nextInt(PRODUCTS.length);
            int quantity = 1 + rand.nextInt(5);
            double totalPrice = Math.round(PRICES[prodIdx] * quantity * 100.0) / 100.0;
            long orderTime = baseTime + (long)(rand.nextDouble() * timeSpan);

            PrimaryKeyBuilder pkb = PrimaryKeyBuilder.createPrimaryKeyBuilder();
            pkb.addPrimaryKeyColumn("_id", PrimaryKeyValue.fromString(id));
            RowPutChange row = new RowPutChange("order_contract", pkb.build());
            row.addColumn("orderId", ColumnValue.fromString(orderId));
            row.addColumn("userId", ColumnValue.fromString(userId));
            row.addColumn("productName", ColumnValue.fromString(PRODUCTS[prodIdx]));
            row.addColumn("brand", ColumnValue.fromString(BRANDS[prodIdx]));
            row.addColumn("category", ColumnValue.fromString(CATEGORIES[prodIdx]));
            row.addColumn("price", ColumnValue.fromDouble(PRICES[prodIdx]));
            row.addColumn("quantity", ColumnValue.fromLong(quantity));
            row.addColumn("totalPrice", ColumnValue.fromDouble(totalPrice));
            row.addColumn("orderTime", ColumnValue.fromLong(orderTime));
            row.addColumn("status", ColumnValue.fromString(
                STATUSES[rand.nextInt(STATUSES.length)]));
            batch.addRowChange(row);
        }
        client.batchWriteRow(batch);
        written += batchSize;
    }
    System.out.println("Written 10000 rows in " + (System.currentTimeMillis() - start) + "ms");
}

Python

import hashlib, random

def write_test_data(client):
    rand = random.Random(42)
    base_time = 1704067200000  # 2024-01-01 UTC
    time_span = 2 * 365 * 24 * 3600 * 1000  # 2 年

    PRODUCTS = ["iPhone 15", "MacBook Pro", "AirPods", "iPad Air",
                "Galaxy S24", "Pixel 8", "ThinkPad X1", "Surface Pro",
                "小米14", "华为Mate60"]
    BRANDS = ["Apple", "Apple", "Apple", "Apple",
              "Samsung", "Google", "Lenovo", "Microsoft", "Xiaomi", "Huawei"]
    CATEGORIES = ["手机", "电脑", "耳机", "平板",
                  "手机", "手机", "电脑", "电脑", "手机", "手机"]
    PRICES = [7999, 14999, 1799, 4599, 6999, 4999, 9999, 8999, 3999, 6999]
    USERS = [f"u{i:05d}" for i in range(1, 11)]
    STATUSES = ["已付款", "已发货", "已完成", "已取消"]
    written = 0

    while written < 10000:
        batch_size = min(200, 10000 - written)
        put_rows = []

        for i in range(batch_size):
            idx = written + i
            order_id = f"ORD{idx:08d}"
            _id = hashlib.md5(order_id.encode()).hexdigest()
            user_id = rand.choice(USERS)
            prod_idx = rand.randint(0, len(PRODUCTS) - 1)
            quantity = 1 + rand.randint(0, 4)
            total_price = round(PRICES[prod_idx] * quantity, 2)
            order_time = base_time + int(rand.random() * time_span)

            pk = [("_id", _id)]
            cols = [
                ("orderId", order_id), ("userId", user_id),
                ("productName", PRODUCTS[prod_idx]),
                ("brand", BRANDS[prod_idx]),
                ("category", CATEGORIES[prod_idx]),
                ("price", float(PRICES[prod_idx])),
                ("quantity", quantity),
                ("totalPrice", float(total_price)),
                ("orderTime", order_time),
                ("status", rand.choice(STATUSES)),
            ]
            row = tablestore.Row(pk, cols)
            put_rows.append(tablestore.PutRowItem(
                row, tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)))

        request = tablestore.BatchWriteRowRequest()
        request.add(tablestore.TableInBatchWriteRowItem("order_contract", put_rows))
        client.batch_write_row(request)
        written += batch_size

步骤三:检索数据

数据写入后,等待多元索引数据同步完成(通常数秒至数分钟,取决于数据量)。以下示例演示三种典型检索场景:

场景一:按用户查询订单

使用 TermQuery 精确匹配用户编号,查询该用户的所有订单,按下单时间倒序排列。

Java

// 查询用户 u00001 的所有订单,按下单时间倒序
TermQuery userQuery = new TermQuery();
userQuery.setFieldName("userId");
userQuery.setTerm(ColumnValue.fromString("u00001"));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(userQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
    new FieldSort("orderTime", SortOrder.DESC))));

SearchRequest request = new SearchRequest("order_contract", "order_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);

SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
    System.out.println(row);
}

Python

# 查询用户 u00001 的所有订单,按下单时间倒序
result = client.search("order_contract", "order_index",
    tablestore.SearchQuery(
        tablestore.TermQuery("userId", "u00001"),
        get_total_count=True, limit=5,
        sort=tablestore.Sort(sorters=[
            tablestore.FieldSort("orderTime", tablestore.SortOrder.DESC)])),
    tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))

print(f"Total hits: {result.total_count}")
for row in result.rows:
    cols = {c[0]: c[1] for c in row[1]}
    print(f"orderId={cols['orderId']} | product={cols['productName']}"
          f" | totalPrice={cols['totalPrice']} | status={cols['status']}")

场景二:多条件组合查询

使用 BoolQuery 组合 TermQuery(用户筛选)、RangeQuery(金额范围)和 RangeQuery(时间范围),实现"某用户在指定时间段内金额超过 5000 元的订单"查询。

Java

// 用户筛选:userId = "u00001"
TermQuery userQuery = new TermQuery();
userQuery.setFieldName("userId");
userQuery.setTerm(ColumnValue.fromString("u00001"));

// 金额范围:totalPrice >= 5000
RangeQuery priceQuery = new RangeQuery();
priceQuery.setFieldName("totalPrice");
priceQuery.greaterThanOrEqual(ColumnValue.fromDouble(5000));

// 时间范围:2024 年全年
RangeQuery timeQuery = new RangeQuery();
timeQuery.setFieldName("orderTime");
timeQuery.greaterThanOrEqual(ColumnValue.fromLong(1704067200000L)); // 2024-01-01
timeQuery.lessThan(ColumnValue.fromLong(1735689600000L));           // 2025-01-01

// BoolQuery 组合三个条件
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(userQuery, priceQuery, timeQuery));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
    new FieldSort("totalPrice", SortOrder.DESC))));

SearchRequest request = new SearchRequest("order_contract", "order_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);

SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
    System.out.println(row);
}

Python

# 用户筛选
user_query = tablestore.TermQuery("userId", "u00001")
# 金额范围:>= 5000
price_query = tablestore.RangeQuery("totalPrice",
    range_from=5000, include_lower=True)
# 时间范围:2024 年全年
time_query = tablestore.RangeQuery("orderTime",
    range_from=1704067200000, range_to=1735689600000,
    include_lower=True, include_upper=False)

# BoolQuery 组合三个条件
bool_query = tablestore.BoolQuery(
    must_queries=[user_query, price_query, time_query])

result = client.search("order_contract", "order_index",
    tablestore.SearchQuery(bool_query, get_total_count=True, limit=5,
        sort=tablestore.Sort(sorters=[
            tablestore.FieldSort("totalPrice", tablestore.SortOrder.DESC)])),
    tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))

print(f"Total hits: {result.total_count}")
for row in result.rows:
    cols = {c[0]: c[1] for c in row[1]}
    print(f"orderId={cols['orderId']} | product={cols['productName']}"
          f" | totalPrice={cols['totalPrice']}")

场景三:按商品类目统计

使用 GroupByField 按商品类目分组,统计每种类目的订单数量和总销售额,按总销售额降序排列。

Java

SearchRequest request = SearchRequest.newBuilder()
    .tableName("order_contract").indexName("order_index")
    .searchQuery(SearchQuery.newBuilder()
        .query(QueryBuilders.matchAll())
        .limit(0)
        .addGroupBy(GroupByBuilders.groupByField("orderCategory", "category")
            .size(20)
            .addSubAggregation(AggregationBuilders.count("orderCount", "orderId"))
            .addSubAggregation(AggregationBuilders.sum("totalRevenue", "totalPrice"))
            .addGroupBySorter(GroupBySorter.subAggSortInDesc("totalRevenue"))
            .build())
        .build())
    .build();

SearchResponse response = client.search(request);
GroupByFieldResult results = response.getGroupByResults()
    .getAsGroupByFieldResult("orderCategory");
for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {
    long count = item.getSubAggregationResults()
        .getAsCountAggregationResult("orderCount").getValue();
    double revenue = item.getSubAggregationResults()
        .getAsSumAggregationResult("totalRevenue").getValue();
    System.out.println("类别: " + item.getKey()
        + " | 订单数: " + count
        + " | 总销售额: " + String.format("%.2f", revenue));
}

Python

group_by = tablestore.GroupByField("category", size=20,
    sub_aggs=[
        tablestore.Count("orderId", name="orderCount"),
        tablestore.Sum("totalPrice", name="totalRevenue"),
    ],
    group_by_sort=[tablestore.SubAggSort(tablestore.SortOrder.DESC, "totalRevenue")],
    name="orderCategory")

result = client.search("order_contract", "order_index",
    tablestore.SearchQuery(tablestore.MatchAllQuery(), limit=0,
                           group_bys=[group_by]),
    tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.NONE))

for item in result.group_by_results[0].items:
    aggs = {a.name: a.value for a in item.sub_aggs}
    print(f"类别: {item.key} | 订单数: {int(aggs['orderCount'])}"
          f" | 总销售额: {aggs['totalRevenue']:.2f}")

资源清理

说明

不再使用本方案中创建的资源时,及时清理以避免不必要的费用。

按以下顺序清理资源:先删除多元索引,再删除数据表。如果实例是为本方案新建的,最后释放实例。

Java

private static void cleanup(SyncClient client) {
    // 1. 删除多元索引
    DeleteSearchIndexRequest deleteIndexReq = new DeleteSearchIndexRequest();
    deleteIndexReq.setTableName("order_contract");
    deleteIndexReq.setIndexName("order_index");
    client.deleteSearchIndex(deleteIndexReq);

    // 2. 删除数据表
    client.deleteTable(new DeleteTableRequest("order_contract"));
}

Python

def cleanup(client):
    # 1. 删除多元索引
    client.delete_search_index("order_contract", "order_index")
    # 2. 删除数据表
    client.delete_table("order_contract")