搭建亿量级店铺搜索系统

更新时间:
复制 MD 格式

通过表格存储的多元索引和地理位置查询能力,搭建支持多维检索的亿量级店铺搜索系统。

方案概述

店铺搜索平台需要对亿量级店铺数据执行多维组合检索。典型场景包括:按地理位置搜索附近指定类型的店铺、按名称模糊搜索并按评分排序、按店铺类型统计数量和评分等。核心挑战在于地理位置数据的高效检索和多条件组合查询。

表格存储单表支持 PB 级数据存储和千万级 TPS。多元索引支持 GEO_POINT 地理位置索引、分词文本索引、多条件组合查询和统计聚合。

方案设计

本方案使用一张数据表存储店铺信息,通过多元索引实现地理位置查询和多维检索。数据表和索引的字段设计如下。

字段类型

字段

数据类型

索引字段类型

说明

主键列

_id

STRING

店铺 ID 的 MD5 值,作为分区键散列分布数据

属性列

shop_id

STRING

KEYWORD

店铺编号

type

STRING

KEYWORD

店铺类型(如奶茶、咖啡、火锅)

name

STRING

TEXT

店铺名称,TEXT 类型支持分词模糊查询

pos

STRING

GEO_POINT

店铺地理位置,格式为"纬度,经度"(如 "30.270000,120.150000")

score

DOUBLE

DOUBLE

店铺评分(1.0~5.0)

avg_price

DOUBLE

DOUBLE

人均消费(元)

方案实现

开始前,开通表格存储服务并创建实例(如已有可用实例,跳过此步骤)。操作方法,参见开通表格存储服务并创建实例

说明

以下示例使用 Java 和 Python SDK。开始前,根据需要安装和配置Java SDKPython SDK

步骤一:创建数据表和多元索引

创建数据表时只需指定主键列,属性列无需预定义。数据表创建完成后,为其创建多元索引,包含 GEO_POINT 地理位置索引和 TEXT 分词索引。

Java

// 创建数据表
private static void createTable(SyncClient client) {
    TableMeta tableMeta = new TableMeta("shop");
    tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("_id", PrimaryKeyType.STRING));

    TableOptions tableOptions = new TableOptions();
    tableOptions.setMaxVersions(1);
    tableOptions.setTimeToLive(-1);

    client.createTable(new CreateTableRequest(tableMeta, tableOptions));
}

// 创建多元索引
private static void createSearchIndex(SyncClient client) {
    CreateSearchIndexRequest request = new CreateSearchIndexRequest();
    request.setTableName("shop");
    request.setIndexName("shop_index");

    IndexSchema schema = new IndexSchema();
    schema.setFieldSchemas(Arrays.asList(
        new FieldSchema("shop_id", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("pos", FieldType.GEO_POINT).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("score", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("avg_price", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true)
    ));

    request.setIndexSchema(schema);
    client.createSearchIndex(request);
}

Python

# 创建数据表
def create_table(client):
    table_meta = tablestore.TableMeta("shop", [("_id", "STRING")])
    table_options = tablestore.TableOptions(time_to_live=-1, max_version=1)
    reserved = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
    client.create_table(table_meta, table_options, reserved)

# 创建多元索引
def create_search_index(client):
    fields = [
        tablestore.FieldSchema("shop_id", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("type", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("name", tablestore.FieldType.TEXT, index=True, analyzer="max_word"),
        tablestore.FieldSchema("pos", tablestore.FieldType.GEOPOINT, index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("score", tablestore.FieldType.DOUBLE, index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("avg_price", tablestore.FieldType.DOUBLE, index=True, enable_sort_and_agg=True),
    ]
    client.create_search_index("shop", "shop_index", tablestore.SearchIndexMeta(fields))

步骤二:写入测试数据

以下代码通过 BatchWriteRow 接口批量写入 10000 条店铺数据,覆盖 5 种店铺类型。地理位置分布在杭州市区范围内(纬度 30.1°~30.4°,经度 120.0°~120.3°)。修改 TOTAL_ROWS 常量可调整数据量。

Java

private static final int TOTAL_ROWS = 10000;
private static final String[] TYPES = {"奶茶", "咖啡", "火锅", "日料", "面包"};
private static final String[] PREFIXES = {"沈家", "张记", "老王", "李氏", "新源"};
private static final String[] SUFFIXES = {"奶茶店", "咖啡馆", "火锅店", "料理店", "面包坊"};

private static void writeTestData(SyncClient client) throws Exception {
    Random rand = new Random(42);
    int written = 0;

    while (written < TOTAL_ROWS) {
        BatchWriteRowRequest batch = new BatchWriteRowRequest();
        int batchSize = Math.min(200, TOTAL_ROWS - written);

        for (int i = 0; i < batchSize; i++) {
            int idx = written + i;
            String shopId = String.format("shop-%06d", idx);
            String id = md5(shopId);
            String type = TYPES[rand.nextInt(TYPES.length)];
            String name = PREFIXES[rand.nextInt(PREFIXES.length)] + SUFFIXES[rand.nextInt(SUFFIXES.length)];

            // 杭州市区范围内随机位置
            double lat = 30.1 + rand.nextDouble() * 0.3;
            double lon = 120.0 + rand.nextDouble() * 0.3;
            // GEO_POINT 格式:"纬度,经度"
            String pos = String.format("%.6f,%.6f", lat, lon);
            double score = Math.round((3.0 + rand.nextDouble() * 2.0) * 10.0) / 10.0;
            double avgPrice = 10 + rand.nextInt(191);

            PrimaryKeyBuilder pkb = PrimaryKeyBuilder.createPrimaryKeyBuilder();
            pkb.addPrimaryKeyColumn("_id", PrimaryKeyValue.fromString(id));
            RowPutChange row = new RowPutChange("shop", pkb.build());
            row.addColumn("shop_id", ColumnValue.fromString(shopId));
            row.addColumn("type", ColumnValue.fromString(type));
            row.addColumn("name", ColumnValue.fromString(name));
            row.addColumn("pos", ColumnValue.fromString(pos));
            row.addColumn("score", ColumnValue.fromDouble(score));
            row.addColumn("avg_price", ColumnValue.fromDouble(avgPrice));
            batch.addRowChange(row);
        }
        client.batchWriteRow(batch);
        written += batchSize;
    }
    System.out.println("写入 " + TOTAL_ROWS + " 行数据完成。");
}

Python

import hashlib, random

TOTAL_ROWS = 10000
TYPES = ["奶茶", "咖啡", "火锅", "日料", "面包"]
PREFIXES = ["沈家", "张记", "老王", "李氏", "新源"]
SUFFIXES = ["奶茶店", "咖啡馆", "火锅店", "料理店", "面包坊"]

def write_test_data(client):
    rand = random.Random(42)
    written = 0

    while written < TOTAL_ROWS:
        batch_size = min(200, TOTAL_ROWS - written)
        put_rows = []

        for i in range(batch_size):
            idx = written + i
            shop_id = f"shop-{idx:06d}"
            _id = hashlib.md5(shop_id.encode()).hexdigest()
            shop_type = rand.choice(TYPES)
            name = rand.choice(PREFIXES) + rand.choice(SUFFIXES)

            lat = 30.1 + rand.random() * 0.3
            lon = 120.0 + rand.random() * 0.3
            pos = f"{lat:.6f},{lon:.6f}"
            score = round(3.0 + rand.random() * 2.0, 1)
            avg_price = float(10 + rand.randint(0, 190))

            pk = [("_id", _id)]
            cols = [
                ("shop_id", shop_id), ("type", shop_type), ("name", name),
                ("pos", pos), ("score", score), ("avg_price", avg_price),
            ]
            row = tablestore.Row(pk, cols)
            put_rows.append(tablestore.PutRowItem(row,
                tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)))

        request = tablestore.BatchWriteRowRequest()
        request.add(tablestore.TableInBatchWriteRowItem("shop", put_rows))
        client.batch_write_row(request)
        written += batch_size

    print(f"写入 {TOTAL_ROWS} 行数据完成。")

步骤三:检索数据

数据写入后,多元索引自动同步数据(通常秒级延迟)。以下示例展示 3 个典型的店铺搜索场景。

场景 1:搜索附近指定类型的店铺

组合 GeoDistanceQuery(地理距离查询)、TermQuery(精确匹配店铺类型)和 RangeQuery(人均消费上限),搜索指定位置 5 公里内、人均消费不超过 50 元的奶茶店,按评分降序排序。

Java

public static void queryNearbyShops(SyncClient client) {
    // 地理距离查询:指定中心点 5 公里范围内
    GeoDistanceQuery geoQuery = new GeoDistanceQuery();
    geoQuery.setFieldName("pos");
    geoQuery.setCenterPoint("30.270000,120.150000");
    geoQuery.setDistanceInMeter(5000);

    // 精确匹配店铺类型
    TermQuery typeQuery = new TermQuery();
    typeQuery.setFieldName("type");
    typeQuery.setTerm(ColumnValue.fromString("奶茶"));

    // 人均消费不超过 50 元
    RangeQuery priceQuery = new RangeQuery();
    priceQuery.setFieldName("avg_price");
    priceQuery.lessThanOrEqual(ColumnValue.fromDouble(50));

    // 组合查询
    BoolQuery boolQuery = new BoolQuery();
    boolQuery.setMustQueries(Arrays.asList(geoQuery, typeQuery, priceQuery));

    SearchQuery searchQuery = new SearchQuery();
    searchQuery.setQuery(boolQuery);
    searchQuery.setGetTotalCount(true);
    searchQuery.setLimit(5);
    searchQuery.setSort(new Sort(Arrays.asList(new FieldSort("score", SortOrder.DESC))));

    SearchRequest searchRequest = new SearchRequest("shop", "shop_index", searchQuery);
    SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
    columnsToGet.setReturnAll(true);
    searchRequest.setColumnsToGet(columnsToGet);

    SearchResponse response = client.search(searchRequest);
    System.out.println("匹配总数: " + response.getTotalCount());
    for (Row row : response.getRows()) {
        System.out.println(row);
    }
}

Python

def query_nearby_shops(client):
    bool_query = tablestore.BoolQuery(must_queries=[
        tablestore.GeoDistanceQuery("pos", "30.270000,120.150000", 5000),
        tablestore.TermQuery("type", "奶茶"),
        tablestore.RangeQuery("avg_price", range_to=50, include_upper=True),
    ])

    result = client.search("shop", "shop_index",
        tablestore.SearchQuery(bool_query, get_total_count=True, limit=5,
            sort=tablestore.Sort(sorters=[tablestore.FieldSort("score", tablestore.SortOrder.DESC)])),
        tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))

    print(f"匹配总数: {result.total_count}")
    for row in result.rows:
        print(row)

场景 2:按名称搜索店铺

使用 MatchQuery 对 TEXT 类型的 name 字段执行分词模糊搜索,按评分降序排序。

Java

public static void queryByName(SyncClient client) {
    MatchQuery nameQuery = new MatchQuery();
    nameQuery.setFieldName("name");
    nameQuery.setText("沈家");

    SearchQuery searchQuery = new SearchQuery();
    searchQuery.setQuery(nameQuery);
    searchQuery.setGetTotalCount(true);
    searchQuery.setLimit(5);
    searchQuery.setSort(new Sort(Arrays.asList(new FieldSort("score", SortOrder.DESC))));

    SearchRequest searchRequest = new SearchRequest("shop", "shop_index", searchQuery);
    SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
    columnsToGet.setReturnAll(true);
    searchRequest.setColumnsToGet(columnsToGet);

    SearchResponse response = client.search(searchRequest);
    System.out.println("匹配总数: " + response.getTotalCount());
    for (Row row : response.getRows()) {
        System.out.println(row);
    }
}

Python

def query_by_name(client):
    result = client.search("shop", "shop_index",
        tablestore.SearchQuery(tablestore.MatchQuery("name", "沈家"),
            get_total_count=True, limit=5,
            sort=tablestore.Sort(sorters=[tablestore.FieldSort("score", tablestore.SortOrder.DESC)])),
        tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))

    print(f"匹配总数: {result.total_count}")
    for row in result.rows:
        print(row)

场景 3:按店铺类型分组统计

使用 GroupByField 按店铺类型分组,通过 CountAvg 子聚合统计各类型的店铺数量和平均评分。

Java

public static void groupByType(SyncClient client) {
    SearchRequest searchRequest = SearchRequest.newBuilder()
        .tableName("shop").indexName("shop_index")
        .searchQuery(SearchQuery.newBuilder()
            .query(QueryBuilders.matchAll())
            .limit(0)
            .addGroupBy(GroupByBuilders.groupByField("shopType", "type")
                .size(20)
                .addSubAggregation(AggregationBuilders.count("shopCount", "shop_id"))
                .addSubAggregation(AggregationBuilders.avg("avgScore", "score"))
                .addGroupBySorter(GroupBySorter.subAggSortInDesc("shopCount"))
                .build())
            .build())
        .build();

    SearchResponse resp = client.search(searchRequest);
    GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("shopType");
    for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {
        System.out.println("类型: " + item.getKey()
            + "\t店铺数: " + item.getSubAggregationResults().getAsCountAggregationResult("shopCount").getValue()
            + "\t平均评分: " + String.format("%.1f",
                item.getSubAggregationResults().getAsAvgAggregationResult("avgScore").getValue()));
    }
}

Python

def group_by_type(client):
    group_by = tablestore.GroupByField("type", size=20,
        sub_aggs=[
            tablestore.Count("shop_id", name="shopCount"),
            tablestore.Avg("score", name="avgScore"),
        ],
        group_by_sort=[tablestore.SubAggSort(tablestore.SortOrder.DESC, "shopCount")],
        name="shopType")

    result = client.search("shop", "shop_index",
        tablestore.SearchQuery(tablestore.MatchAllQuery(), limit=0, group_bys=[group_by]),
        tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.NONE))

    for item in result.group_by_results[0].items:
        aggs = {a.name: a.value for a in item.sub_aggs}
        print(f"类型: {item.key}\t店铺数: {int(aggs['shopCount'])}\t平均评分: {aggs['avgScore']:.1f}")

资源清理

说明

不再需要本方案中创建的资源时,按以下步骤清理,避免产生不必要的费用。

按以下顺序清理资源:先删除多元索引,再删除数据表。如果实例是为本方案新建的,最后释放实例。

Java

private static void cleanup(SyncClient client) {
    // 1. 删除多元索引
    DeleteSearchIndexRequest deleteIndexReq = new DeleteSearchIndexRequest();
    deleteIndexReq.setTableName("shop");
    deleteIndexReq.setIndexName("shop_index");
    client.deleteSearchIndex(deleteIndexReq);

    // 2. 删除数据表
    client.deleteTable(new DeleteTableRequest("shop"));
}

Python

def cleanup(client):
    # 1. 删除多元索引
    client.delete_search_index("shop", "shop_index")
    # 2. 删除数据表
    client.delete_table("shop")