通过表格存储的多元索引和嵌套类型(Nested)索引能力,搭建支持多维检索的海量智能元数据管理系统。
方案概述
智能媒体管理平台需要高效管理海量文件(图片、视频等)的元数据。文件元数据包含基本信息(文件大小、创建时间、用户等)和 AI 提取的标签信息(标签名和置信度评分)。核心挑战在于标签是多值嵌套结构(每个文件有多个标签,每个标签有名称和评分),需要支持按标签名精确查询、按评分排序,以及组合时间范围和文件类型等多条件检索。
表格存储的多元索引支持嵌套类型索引,为 JSON 数组中的子字段建立独立索引。结合 NestedQuery、BoolQuery 和 GroupBy,实现嵌套字段精确查询、多条件组合检索和统计聚合。
方案设计
本方案使用一张数据表存储文件元数据,通过多元索引实现嵌套标签查询和多维检索。字段设计如下。
|
字段类型 |
字段 |
数据类型 |
索引字段类型 |
说明 |
|
主键列 |
_id |
String |
— |
文件编号的 MD5 哈希值,用于打散数据避免写入热点。 |
|
属性列 |
fId |
String |
KEYWORD |
文件编号,用于精确查询。 |
|
userId |
String |
KEYWORD |
用户编号,用于按用户筛选。 |
|
|
type |
String |
KEYWORD |
文件类型(image、video、document、audio)。 |
|
|
size |
Integer |
LONG |
文件大小,单位为字节。 |
|
|
createdAt |
Integer |
LONG |
文件创建时间戳(毫秒)。 |
|
|
url |
String |
KEYWORD |
文件存储链接(如 OSS 地址)。 |
tags 字段以 JSON 数组格式存储,例如 [{"tag":"风景","score":95},{"tag":"建筑","score":80}]。多元索引的 NESTED 类型自动解析 JSON 数组,为每个子字段建立独立索引。
方案实现
开始前,开通表格存储服务并创建实例(如已有可用实例,跳过此步骤)。
以下示例使用 Java 和 Python SDK。开始前,根据需要安装和配置Tablestore Java SDK或Tablestore Python SDK。
步骤一:创建数据表和多元索引
创建数据表时只需指定主键列,属性列无需预定义。创建数据表后,为其创建多元索引,包含 NESTED 嵌套类型索引(含 tag 和 score 两个子字段)。
Java
// 创建数据表
private static void createTable(SyncClient client) {
TableMeta tableMeta = new TableMeta("meta_file");
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("_id", PrimaryKeyType.STRING));
TableOptions tableOptions = new TableOptions();
tableOptions.setMaxVersions(1);
tableOptions.setTimeToLive(-1);
client.createTable(new CreateTableRequest(tableMeta, tableOptions));
}
// 创建多元索引(含嵌套类型字段)
private static void createSearchIndex(SyncClient client) {
CreateSearchIndexRequest request = new CreateSearchIndexRequest();
request.setTableName("meta_file");
request.setIndexName("meta_file_index");
// 嵌套字段 tags 的子字段定义
List<FieldSchema> tagSubFields = Arrays.asList(
new FieldSchema("tag", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("score", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true)
);
IndexSchema indexSchema = new IndexSchema();
indexSchema.setFieldSchemas(Arrays.asList(
new FieldSchema("fId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("userId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("tags", FieldType.NESTED).setIndex(true).setSubFieldSchemas(tagSubFields),
new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("size", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("createdAt", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("url", FieldType.KEYWORD).setIndex(true)
));
request.setIndexSchema(indexSchema);
client.createSearchIndex(request);
}
Python
def create_table_and_index(client):
# 创建数据表
table_meta = tablestore.TableMeta("meta_file", [("_id", "STRING")])
table_options = tablestore.TableOptions(time_to_live=-1, max_version=1)
reserved = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
client.create_table(table_meta, table_options, reserved)
# 嵌套字段 tags 的子字段定义
tag_sub_fields = [
tablestore.FieldSchema("tag", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("score", tablestore.FieldType.LONG,
index=True, enable_sort_and_agg=True),
]
# 创建多元索引(含嵌套类型字段)
fields = [
tablestore.FieldSchema("fId", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("userId", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("tags", tablestore.FieldType.NESTED,
index=True, sub_field_schemas=tag_sub_fields),
tablestore.FieldSchema("type", tablestore.FieldType.KEYWORD,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("size", tablestore.FieldType.LONG,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("createdAt", tablestore.FieldType.LONG,
index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("url", tablestore.FieldType.KEYWORD, index=True),
]
client.create_search_index("meta_file", "meta_file_index",
tablestore.SearchIndexMeta(fields))
步骤二:写入测试数据
通过 BatchWriteRow 接口批量写入 10,000 条文件元数据。每条数据包含随机生成的文件编号、用户编号、文件类型、AI 标签(2~4 个,含标签名和置信度评分)、文件大小和创建时间。主键 _id 使用文件编号的 MD5 哈希值,打散数据以避免写入热点。BatchWriteRow 单次最多写入 200 行。
Java
private static void writeTestData(SyncClient client) throws Exception {
Random rand = new Random(42);
long start = System.currentTimeMillis();
int written = 0;
long baseTime = 1704067200000L; // 2024-01-01 UTC
long timeSpan = 2L * 365 * 24 * 3600 * 1000; // 2 年
String[] TAGS = {"风景", "人物", "建筑", "美食", "动物", "植物", "交通", "运动", "表格存储", "科技"};
String[] TYPES = {"image", "video", "document", "audio"};
String[] USERS = {"u00001", "u00002", "u00003", "u00004", "u00005"};
while (written < 10000) {
BatchWriteRowRequest batch = new BatchWriteRowRequest();
int batchSize = Math.min(200, 10000 - written);
for (int i = 0; i < batchSize; i++) {
int idx = written + i;
String fId = String.format("f%08d", idx);
String id = md5(fId);
String userId = USERS[rand.nextInt(USERS.length)];
String type = TYPES[rand.nextInt(TYPES.length)];
long size = 1024 + rand.nextInt(10 * 1024 * 1024);
long createdAt = baseTime + (long)(rand.nextDouble() * timeSpan);
String url = "oss://bucket/files/" + fId + "." + type;
// 生成 2~4 个随机标签及评分
int tagCount = 2 + rand.nextInt(3);
Set<Integer> usedIndices = new HashSet<>();
StringBuilder tagsJson = new StringBuilder("[");
for (int t = 0; t < tagCount; t++) {
int tagIdx;
do { tagIdx = rand.nextInt(TAGS.length); } while (usedIndices.contains(tagIdx));
usedIndices.add(tagIdx);
long score = 50 + rand.nextInt(50);
if (t > 0) tagsJson.append(",");
tagsJson.append("{\"tag\":\"").append(TAGS[tagIdx])
.append("\",\"score\":").append(score).append("}");
}
tagsJson.append("]");
PrimaryKeyBuilder pkb = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pkb.addPrimaryKeyColumn("_id", PrimaryKeyValue.fromString(id));
RowPutChange row = new RowPutChange("meta_file", pkb.build());
row.addColumn("fId", ColumnValue.fromString(fId));
row.addColumn("userId", ColumnValue.fromString(userId));
row.addColumn("tags", ColumnValue.fromString(tagsJson.toString()));
row.addColumn("type", ColumnValue.fromString(type));
row.addColumn("size", ColumnValue.fromLong(size));
row.addColumn("createdAt", ColumnValue.fromLong(createdAt));
row.addColumn("url", ColumnValue.fromString(url));
batch.addRowChange(row);
}
client.batchWriteRow(batch);
written += batchSize;
}
System.out.println("Written 10000 rows in " + (System.currentTimeMillis() - start) + "ms");
}
// MD5 工具方法
private static String md5(String input) throws Exception {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(input.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder();
for (byte b : digest) sb.append(String.format("%02x", b));
return sb.toString();
}
Python
import hashlib, json, random
def write_test_data(client):
rand = random.Random(42)
base_time = 1704067200000 # 2024-01-01 UTC
time_span = 2 * 365 * 24 * 3600 * 1000 # 2 年
TAGS = ["风景", "人物", "建筑", "美食", "动物", "植物", "交通", "运动", "表格存储", "科技"]
TYPES = ["image", "video", "document", "audio"]
USERS = ["u00001", "u00002", "u00003", "u00004", "u00005"]
written = 0
while written < 10000:
batch_size = min(200, 10000 - written)
put_rows = []
for i in range(batch_size):
idx = written + i
f_id = f"f{idx:08d}"
_id = hashlib.md5(f_id.encode()).hexdigest()
user_id = rand.choice(USERS)
file_type = rand.choice(TYPES)
size = 1024 + rand.randint(0, 10 * 1024 * 1024)
created_at = base_time + int(rand.random() * time_span)
url = f"oss://bucket/files/{f_id}.{file_type}"
# 生成 2~4 个随机标签及评分
tag_count = 2 + rand.randint(0, 2)
tag_indices = rand.sample(range(len(TAGS)), tag_count)
tags_list = [{"tag": TAGS[ti], "score": 50 + rand.randint(0, 49)}
for ti in tag_indices]
tags_json = json.dumps(tags_list, ensure_ascii=False)
pk = [("_id", _id)]
cols = [("fId", f_id), ("userId", user_id), ("tags", tags_json),
("type", file_type), ("size", size),
("createdAt", created_at), ("url", url)]
row = tablestore.Row(pk, cols)
put_rows.append(tablestore.PutRowItem(
row, tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)))
request = tablestore.BatchWriteRowRequest()
request.add(tablestore.TableInBatchWriteRowItem("meta_file", put_rows))
client.batch_write_row(request)
written += batch_size
步骤三:检索数据
数据写入后,等待多元索引数据同步完成。以下示例演示三种典型检索场景。
场景一:按标签查询文件
使用 NestedQuery 查询包含指定标签的所有文件,按创建时间倒序排列。NestedQuery 用于在嵌套类型字段中执行子查询,需指定嵌套字段的路径。
Java
// 查询包含标签 "表格存储" 的所有文件
TermQuery tagQuery = new TermQuery();
tagQuery.setFieldName("tags.tag");
tagQuery.setTerm(ColumnValue.fromString("表格存储"));
NestedQuery nestedQuery = new NestedQuery();
nestedQuery.setPath("tags");
nestedQuery.setScoreMode(ScoreMode.Avg);
nestedQuery.setQuery(tagQuery);
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(nestedQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
new FieldSort("createdAt", SortOrder.DESC))));
SearchRequest request = new SearchRequest("meta_file", "meta_file_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
System.out.println(row);
}
Python
# 查询包含标签 "表格存储" 的所有文件
nested_query = tablestore.NestedQuery(
"tags",
tablestore.TermQuery("tags.tag", "表格存储"),
score_mode=tablestore.ScoreMode.AVG,
)
result = client.search("meta_file", "meta_file_index",
tablestore.SearchQuery(nested_query, get_total_count=True, limit=5,
sort=tablestore.Sort(sorters=[
tablestore.FieldSort("createdAt", tablestore.SortOrder.DESC)])),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))
print(f"Total hits: {result.total_count}")
for row in result.rows:
cols = {c[0]: c[1] for c in row[1]}
print(f"fId={cols['fId']} | type={cols['type']} | tags={cols['tags']}")
场景二:多条件组合查询
使用 BoolQuery 组合 NestedQuery(标签精确匹配)、RangeQuery(时间范围)和 TermQuery(文件类型筛选),实现多维度组合检索。例如查询 2024 年上传的、包含标签"表格存储"的所有图片文件。
Java
// 嵌套标签查询:tags.tag = "表格存储"
TermQuery tagQuery = new TermQuery();
tagQuery.setFieldName("tags.tag");
tagQuery.setTerm(ColumnValue.fromString("表格存储"));
NestedQuery nestedQuery = new NestedQuery();
nestedQuery.setPath("tags");
nestedQuery.setScoreMode(ScoreMode.Avg);
nestedQuery.setQuery(tagQuery);
// 时间范围查询:2024 年全年
RangeQuery timeQuery = new RangeQuery();
timeQuery.setFieldName("createdAt");
timeQuery.greaterThanOrEqual(ColumnValue.fromLong(1704067200000L)); // 2024-01-01
timeQuery.lessThan(ColumnValue.fromLong(1735689600000L)); // 2025-01-01
// 文件类型查询:type = "image"
TermQuery typeQuery = new TermQuery();
typeQuery.setFieldName("type");
typeQuery.setTerm(ColumnValue.fromString("image"));
// 使用 BoolQuery 组合三个条件
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(nestedQuery, timeQuery, typeQuery));
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
new FieldSort("createdAt", SortOrder.DESC))));
SearchRequest request = new SearchRequest("meta_file", "meta_file_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
System.out.println(row);
}
Python
# 嵌套标签查询
nested_query = tablestore.NestedQuery(
"tags",
tablestore.TermQuery("tags.tag", "表格存储"),
score_mode=tablestore.ScoreMode.AVG,
)
# 时间范围查询:2024 年全年
range_query = tablestore.RangeQuery(
"createdAt",
range_from=1704067200000, # 2024-01-01
range_to=1735689600000, # 2025-01-01
include_lower=True,
include_upper=False,
)
# 文件类型查询
type_query = tablestore.TermQuery("type", "image")
# BoolQuery 组合三个条件
bool_query = tablestore.BoolQuery(
must_queries=[nested_query, range_query, type_query])
result = client.search("meta_file", "meta_file_index",
tablestore.SearchQuery(bool_query, get_total_count=True, limit=5,
sort=tablestore.Sort(sorters=[
tablestore.FieldSort("createdAt", tablestore.SortOrder.DESC)])),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))
print(f"Total hits: {result.total_count}")
for row in result.rows:
cols = {c[0]: c[1] for c in row[1]}
print(f"fId={cols['fId']} | type={cols['type']} | createdAt={cols['createdAt']}")
场景三:按文件类型统计
使用 GroupByField 按文件类型分组,统计每种类型的文件数量和平均文件大小。
Java
SearchRequest request = SearchRequest.newBuilder()
.tableName("meta_file").indexName("meta_file_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(0)
.addGroupBy(GroupByBuilders.groupByField("fileType", "type")
.size(20)
.addSubAggregation(AggregationBuilders.count("fileCount", "fId"))
.addSubAggregation(AggregationBuilders.avg("avgSize", "size"))
.addGroupBySorter(GroupBySorter.subAggSortInDesc("fileCount"))
.build())
.build())
.build();
SearchResponse response = client.search(request);
GroupByFieldResult results = response.getGroupByResults()
.getAsGroupByFieldResult("fileType");
for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {
long count = item.getSubAggregationResults()
.getAsCountAggregationResult("fileCount").getValue();
double avgSize = item.getSubAggregationResults()
.getAsAvgAggregationResult("avgSize").getValue();
System.out.println("类型: " + item.getKey()
+ " | 文件数: " + count
+ " | 平均大小: " + String.format("%.0f", avgSize) + " bytes");
}
Python
group_by = tablestore.GroupByField("type", size=20,
sub_aggs=[
tablestore.Count("fId", name="fileCount"),
tablestore.Avg("size", name="avgSize"),
],
group_by_sort=[tablestore.SubAggSort(tablestore.SortOrder.DESC, "fileCount")],
name="fileType")
result = client.search("meta_file", "meta_file_index",
tablestore.SearchQuery(tablestore.MatchAllQuery(), limit=0,
group_bys=[group_by]),
tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.NONE))
for item in result.group_by_results[0].items:
aggs = {a.name: a.value for a in item.sub_aggs}
print(f"类型: {item.key} | 文件数: {int(aggs['fileCount'])}"
f" | 平均大小: {aggs['avgSize']:.0f} bytes")
资源清理
不再需要本方案中创建的资源时,按以下步骤清理,避免产生不必要的费用。
清理顺序:先删除多元索引,再删除数据表。如果实例是为本方案新建的,最后释放实例。
Java
private static void cleanup(SyncClient client) {
// 1. 删除多元索引
DeleteSearchIndexRequest deleteIndexReq = new DeleteSearchIndexRequest();
deleteIndexReq.setTableName("meta_file");
deleteIndexReq.setIndexName("meta_file_index");
client.deleteSearchIndex(deleteIndexReq);
// 2. 删除数据表
client.deleteTable(new DeleteTableRequest("meta_file"));
}
Python
def cleanup(client):
# 1. 删除多元索引
client.delete_search_index("meta_file", "meta_file_index")
# 2. 删除数据表
client.delete_table("meta_file")