通过表格存储的多元索引和地理位置查询能力,搭建支持多维检索的亿量级店铺搜索系统。
方案概述
店铺搜索平台需要对亿量级店铺数据执行多维组合检索。典型场景包括:按地理位置搜索附近指定类型的店铺、按名称模糊搜索并按评分排序、按店铺类型统计数量和评分等。核心挑战在于地理位置数据的高效检索和多条件组合查询。
表格存储单表支持 PB 级数据存储和千万级 TPS。多元索引支持 GEO_POINT 地理位置索引、分词文本索引、多条件组合查询和统计聚合。
方案设计
本方案使用一张数据表存储店铺信息,通过多元索引实现地理位置查询和多维检索。数据表和索引的字段设计如下。
|
字段类型 |
字段 |
数据类型 |
索引字段类型 |
说明 |
|
主键列 |
_id |
STRING |
— |
店铺 ID 的 MD5 值,作为分区键散列分布数据 |
|
属性列 |
shop_id |
STRING |
KEYWORD |
店铺编号 |
|
type |
STRING |
KEYWORD |
店铺类型(如奶茶、咖啡、火锅) |
|
|
name |
STRING |
TEXT |
店铺名称,TEXT 类型支持分词模糊查询 |
|
|
pos |
STRING |
GEO_POINT |
店铺地理位置,格式为"纬度,经度"(如 "30.270000,120.150000") |
|
|
score |
DOUBLE |
DOUBLE |
店铺评分(1.0~5.0) |
|
|
avg_price |
DOUBLE |
DOUBLE |
人均消费(元) |
方案实现
开始前,开通表格存储服务并创建实例(如已有可用实例,跳过此步骤)。操作方法,参见开通表格存储服务并创建实例。
以下示例使用 Java 和 Python SDK。开始前,根据需要安装和配置Java SDK或Python SDK。
步骤一:创建数据表和多元索引
创建数据表时只需指定主键列,属性列无需预定义。数据表创建完成后,为其创建多元索引,包含 GEO_POINT 地理位置索引和 TEXT 分词索引。
Java
// 创建数据表
private static void createTable(SyncClient client) {
TableMeta tableMeta = new TableMeta("shop");
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("_id", PrimaryKeyType.STRING));
TableOptions tableOptions = new TableOptions();
tableOptions.setMaxVersions(1);
tableOptions.setTimeToLive(-1);
client.createTable(new CreateTableRequest(tableMeta, tableOptions));
}
// 创建多元索引
private static void createSearchIndex(SyncClient client) {
CreateSearchIndexRequest request = new CreateSearchIndexRequest();
request.setTableName("shop");
request.setIndexName("shop_index");
IndexSchema schema = new IndexSchema();
schema.setFieldSchemas(Arrays.asList(
new FieldSchema("shop_id", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("pos", FieldType.GEO_POINT).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("score", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("avg_price", FieldType.DOUBLE).setIndex(true).setEnableSortAndAgg(true)
));
request.setIndexSchema(schema);
client.createSearchIndex(request);
}
Python
# 创建数据表
def create_table(client):
table_meta = tablestore.TableMeta("shop", [("_id", "STRING")])
table_options = tablestore.TableOptions(time_to_live=-1, max_version=1)
reserved = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
client.create_table(table_meta, table_options, reserved)
# 创建多元索引
def create_search_index(client):
fields = [
tablestore.FieldSchema("shop_id", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("type", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("name", tablestore.FieldType.TEXT, index=True, analyzer="max_word"),
tablestore.FieldSchema("pos", tablestore.FieldType.GEOPOINT, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("score", tablestore.FieldType.DOUBLE, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("avg_price", tablestore.FieldType.DOUBLE, index=True, enable_sort_and_agg=True),
]
client.create_search_index("shop", "shop_index", tablestore.SearchIndexMeta(fields))
步骤二:写入测试数据
以下代码通过 BatchWriteRow 接口批量写入 10000 条店铺数据,覆盖 5 种店铺类型。地理位置分布在杭州市区范围内(纬度 30.1°~30.4°,经度 120.0°~120.3°)。修改 TOTAL_ROWS 常量可调整数据量。
Java
private static final int TOTAL_ROWS = 10000;
private static final String[] TYPES = {"奶茶", "咖啡", "火锅", "日料", "面包"};
private static final String[] PREFIXES = {"沈家", "张记", "老王", "李氏", "新源"};
private static final String[] SUFFIXES = {"奶茶店", "咖啡馆", "火锅店", "料理店", "面包坊"};
private static void writeTestData(SyncClient client) throws Exception {
Random rand = new Random(42);
int written = 0;
while (written < TOTAL_ROWS) {
BatchWriteRowRequest batch = new BatchWriteRowRequest();
int batchSize = Math.min(200, TOTAL_ROWS - written);
for (int i = 0; i < batchSize; i++) {
int idx = written + i;
String shopId = String.format("shop-%06d", idx);
String id = md5(shopId);
String type = TYPES[rand.nextInt(TYPES.length)];
String name = PREFIXES[rand.nextInt(PREFIXES.length)] + SUFFIXES[rand.nextInt(SUFFIXES.length)];
// 杭州市区范围内随机位置
double lat = 30.1 + rand.nextDouble() * 0.3;
double lon = 120.0 + rand.nextDouble() * 0.3;
// GEO_POINT 格式:"纬度,经度"
String pos = String.format("%.6f,%.6f", lat, lon);
double score = Math.round((3.0 + rand.nextDouble() * 2.0) * 10.0) / 10.0;
double avgPrice = 10 + rand.nextInt(191);
PrimaryKeyBuilder pkb = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pkb.addPrimaryKeyColumn("_id", PrimaryKeyValue.fromString(id));
RowPutChange row = new RowPutChange("shop", pkb.build());
row.addColumn("shop_id", ColumnValue.fromString(shopId));
row.addColumn("type", ColumnValue.fromString(type));
row.addColumn("name", ColumnValue.fromString(name));
row.addColumn("pos", ColumnValue.fromString(pos));
row.addColumn("score", ColumnValue.fromDouble(score));
row.addColumn("avg_price", ColumnValue.fromDouble(avgPrice));
batch.addRowChange(row);
}
client.batchWriteRow(batch);
written += batchSize;
}
System.out.println("写入 " + TOTAL_ROWS + " 行数据完成。");
}
Python
import hashlib, random
TOTAL_ROWS = 10000
TYPES = ["奶茶", "咖啡", "火锅", "日料", "面包"]
PREFIXES = ["沈家", "张记", "老王", "李氏", "新源"]
SUFFIXES = ["奶茶店", "咖啡馆", "火锅店", "料理店", "面包坊"]
def write_test_data(client):
rand = random.Random(42)
written = 0
while written < TOTAL_ROWS:
batch_size = min(200, TOTAL_ROWS - written)
put_rows = []
for i in range(batch_size):
idx = written + i
shop_id = f"shop-{idx:06d}"
_id = hashlib.md5(shop_id.encode()).hexdigest()
shop_type = rand.choice(TYPES)
name = rand.choice(PREFIXES) + rand.choice(SUFFIXES)
lat = 30.1 + rand.random() * 0.3
lon = 120.0 + rand.random() * 0.3
pos = f"{lat:.6f},{lon:.6f}"
score = round(3.0 + rand.random() * 2.0, 1)
avg_price = float(10 + rand.randint(0, 190))
pk = [("_id", _id)]
cols = [
("shop_id", shop_id), ("type", shop_type), ("name", name),
("pos", pos), ("score", score), ("avg_price", avg_price),
]
row = tablestore.Row(pk, cols)
put_rows.append(tablestore.PutRowItem(row,
tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)))
request = tablestore.BatchWriteRowRequest()
request.add(tablestore.TableInBatchWriteRowItem("shop", put_rows))
client.batch_write_row(request)
written += batch_size
print(f"写入 {TOTAL_ROWS} 行数据完成。")
步骤三:检索数据
数据写入后,多元索引自动同步数据(通常秒级延迟)。以下示例展示 3 个典型的店铺搜索场景。
场景 1:搜索附近指定类型的店铺
组合 GeoDistanceQuery(地理距离查询)、TermQuery(精确匹配店铺类型)和 RangeQuery(人均消费上限),搜索指定位置 5 公里内、人均消费不超过 50 元的奶茶店,按评分降序排序。
Java
public static void queryNearbyShops(SyncClient client) {
// 地理距离查询:指定中心点 5 公里范围内
GeoDistanceQuery geoQuery = new GeoDistanceQuery();
geoQuery.setFieldName("pos");
geoQuery.setCenterPoint("30.270000,120.150000");
geoQuery.setDistanceInMeter(5000);
// 精确匹配店铺类型
TermQuery typeQuery = new TermQuery();
typeQuery.setFieldName("type");
typeQuery.setTerm(ColumnValue.fromString("奶茶"));
// 人均消费不超过 50 元
RangeQuery priceQuery = new RangeQuery();
priceQuery.setFieldName("avg_price");
priceQuery.lessThanOrEqual(ColumnValue.fromDouble(50));
// 组合查询
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(geoQuery, typeQuery, priceQuery));
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(new FieldSort("score", SortOrder.DESC))));
SearchRequest searchRequest = new SearchRequest("shop", "shop_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
searchRequest.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(searchRequest);
System.out.println("匹配总数: " + response.getTotalCount());
for (Row row : response.getRows()) {
System.out.println(row);
}
}
Python
def query_nearby_shops(client):
bool_query = tablestore.BoolQuery(must_queries=[
tablestore.GeoDistanceQuery("pos", "30.270000,120.150000", 5000),
tablestore.TermQuery("type", "奶茶"),
tablestore.RangeQuery("avg_price", range_to=50, include_upper=True),
])
result = client.search("shop", "shop_index",
tablestore.SearchQuery(bool_query, get_total_count=True, limit=5,
sort=tablestore.Sort(sorters=[tablestore.FieldSort("score", tablestore.SortOrder.DESC)])),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))
print(f"匹配总数: {result.total_count}")
for row in result.rows:
print(row)
场景 2:按名称搜索店铺
使用 MatchQuery 对 TEXT 类型的 name 字段执行分词模糊搜索,按评分降序排序。
Java
public static void queryByName(SyncClient client) {
MatchQuery nameQuery = new MatchQuery();
nameQuery.setFieldName("name");
nameQuery.setText("沈家");
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(nameQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(new FieldSort("score", SortOrder.DESC))));
SearchRequest searchRequest = new SearchRequest("shop", "shop_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
searchRequest.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(searchRequest);
System.out.println("匹配总数: " + response.getTotalCount());
for (Row row : response.getRows()) {
System.out.println(row);
}
}
Python
def query_by_name(client):
result = client.search("shop", "shop_index",
tablestore.SearchQuery(tablestore.MatchQuery("name", "沈家"),
get_total_count=True, limit=5,
sort=tablestore.Sort(sorters=[tablestore.FieldSort("score", tablestore.SortOrder.DESC)])),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))
print(f"匹配总数: {result.total_count}")
for row in result.rows:
print(row)
场景 3:按店铺类型分组统计
使用 GroupByField 按店铺类型分组,通过 Count 和 Avg 子聚合统计各类型的店铺数量和平均评分。
Java
public static void groupByType(SyncClient client) {
SearchRequest searchRequest = SearchRequest.newBuilder()
.tableName("shop").indexName("shop_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(0)
.addGroupBy(GroupByBuilders.groupByField("shopType", "type")
.size(20)
.addSubAggregation(AggregationBuilders.count("shopCount", "shop_id"))
.addSubAggregation(AggregationBuilders.avg("avgScore", "score"))
.addGroupBySorter(GroupBySorter.subAggSortInDesc("shopCount"))
.build())
.build())
.build();
SearchResponse resp = client.search(searchRequest);
GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("shopType");
for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {
System.out.println("类型: " + item.getKey()
+ "\t店铺数: " + item.getSubAggregationResults().getAsCountAggregationResult("shopCount").getValue()
+ "\t平均评分: " + String.format("%.1f",
item.getSubAggregationResults().getAsAvgAggregationResult("avgScore").getValue()));
}
}
Python
def group_by_type(client):
group_by = tablestore.GroupByField("type", size=20,
sub_aggs=[
tablestore.Count("shop_id", name="shopCount"),
tablestore.Avg("score", name="avgScore"),
],
group_by_sort=[tablestore.SubAggSort(tablestore.SortOrder.DESC, "shopCount")],
name="shopType")
result = client.search("shop", "shop_index",
tablestore.SearchQuery(tablestore.MatchAllQuery(), limit=0, group_bys=[group_by]),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.NONE))
for item in result.group_by_results[0].items:
aggs = {a.name: a.value for a in item.sub_aggs}
print(f"类型: {item.key}\t店铺数: {int(aggs['shopCount'])}\t平均评分: {aggs['avgScore']:.1f}")
资源清理
不再需要本方案中创建的资源时,按以下步骤清理,避免产生不必要的费用。
按以下顺序清理资源:先删除多元索引,再删除数据表。如果实例是为本方案新建的,最后释放实例。
Java
private static void cleanup(SyncClient client) {
// 1. 删除多元索引
DeleteSearchIndexRequest deleteIndexReq = new DeleteSearchIndexRequest();
deleteIndexReq.setTableName("shop");
deleteIndexReq.setIndexName("shop_index");
client.deleteSearchIndex(deleteIndexReq);
// 2. 删除数据表
client.deleteTable(new DeleteTableRequest("shop"));
}
Python
def cleanup(client):
# 1. 删除多元索引
client.delete_search_index("shop", "shop_index")
# 2. 删除数据表
client.delete_table("shop")