搭建海量智能元数据管理系统

更新时间:
复制 MD 格式

通过表格存储的多元索引和嵌套类型(Nested)索引能力,搭建支持多维检索的海量智能元数据管理系统。

方案概述

智能媒体管理平台需要高效管理海量文件(图片、视频等)的元数据。文件元数据包含基本信息(文件大小、创建时间、用户等)和 AI 提取的标签信息(标签名和置信度评分)。核心挑战在于标签是多值嵌套结构(每个文件有多个标签,每个标签有名称和评分),需要支持按标签名精确查询、按评分排序,以及组合时间范围和文件类型等多条件检索。

表格存储的多元索引支持嵌套类型索引,为 JSON 数组中的子字段建立独立索引。结合 NestedQueryBoolQueryGroupBy,实现嵌套字段精确查询、多条件组合检索和统计聚合。

方案设计

本方案使用一张数据表存储文件元数据,通过多元索引实现嵌套标签查询和多维检索。字段设计如下。

字段类型

字段

数据类型

索引字段类型

说明

主键列

_id

String

文件编号的 MD5 哈希值,用于打散数据避免写入热点。

属性列

fId

String

KEYWORD

文件编号,用于精确查询。

userId

String

KEYWORD

用户编号,用于按用户筛选。

tags

String(JSON 数组)

NESTED

AI 识别的标签数组,每个元素包含子字段 tag(KEYWORD,标签名)和 score(LONG,置信度评分)。通过嵌套类型索引支持 NestedQuery。

type

String

KEYWORD

文件类型(image、video、document、audio)。

size

Integer

LONG

文件大小,单位为字节。

createdAt

Integer

LONG

文件创建时间戳(毫秒)。

url

String

KEYWORD

文件存储链接(如 OSS 地址)。

tags 字段以 JSON 数组格式存储,例如 [{"tag":"风景","score":95},{"tag":"建筑","score":80}]。多元索引的 NESTED 类型自动解析 JSON 数组,为每个子字段建立独立索引。

方案实现

开始前,开通表格存储服务并创建实例(如已有可用实例,跳过此步骤)。

说明

以下示例使用 Java 和 Python SDK。开始前,根据需要安装和配置Tablestore Java SDKTablestore Python SDK

步骤一:创建数据表和多元索引

创建数据表时只需指定主键列,属性列无需预定义。创建数据表后,为其创建多元索引,包含 NESTED 嵌套类型索引(含 tagscore 两个子字段)。

Java

// 创建数据表
private static void createTable(SyncClient client) {
    TableMeta tableMeta = new TableMeta("meta_file");
    tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("_id", PrimaryKeyType.STRING));

    TableOptions tableOptions = new TableOptions();
    tableOptions.setMaxVersions(1);
    tableOptions.setTimeToLive(-1);

    client.createTable(new CreateTableRequest(tableMeta, tableOptions));
}

// 创建多元索引(含嵌套类型字段)
private static void createSearchIndex(SyncClient client) {
    CreateSearchIndexRequest request = new CreateSearchIndexRequest();
    request.setTableName("meta_file");
    request.setIndexName("meta_file_index");

    // 嵌套字段 tags 的子字段定义
    List<FieldSchema> tagSubFields = Arrays.asList(
        new FieldSchema("tag", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("score", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true)
    );

    IndexSchema indexSchema = new IndexSchema();
    indexSchema.setFieldSchemas(Arrays.asList(
        new FieldSchema("fId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("userId", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("tags", FieldType.NESTED).setIndex(true).setSubFieldSchemas(tagSubFields),
        new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("size", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("createdAt", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("url", FieldType.KEYWORD).setIndex(true)
    ));
    request.setIndexSchema(indexSchema);
    client.createSearchIndex(request);
}

Python

def create_table_and_index(client):
    # 创建数据表
    table_meta = tablestore.TableMeta("meta_file", [("_id", "STRING")])
    table_options = tablestore.TableOptions(time_to_live=-1, max_version=1)
    reserved = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
    client.create_table(table_meta, table_options, reserved)

    # 嵌套字段 tags 的子字段定义
    tag_sub_fields = [
        tablestore.FieldSchema("tag", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("score", tablestore.FieldType.LONG,
                               index=True, enable_sort_and_agg=True),
    ]

    # 创建多元索引(含嵌套类型字段)
    fields = [
        tablestore.FieldSchema("fId", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("userId", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("tags", tablestore.FieldType.NESTED,
                               index=True, sub_field_schemas=tag_sub_fields),
        tablestore.FieldSchema("type", tablestore.FieldType.KEYWORD,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("size", tablestore.FieldType.LONG,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("createdAt", tablestore.FieldType.LONG,
                               index=True, enable_sort_and_agg=True),
        tablestore.FieldSchema("url", tablestore.FieldType.KEYWORD, index=True),
    ]
    client.create_search_index("meta_file", "meta_file_index",
                               tablestore.SearchIndexMeta(fields))

步骤二:写入测试数据

通过 BatchWriteRow 接口批量写入 10,000 条文件元数据。每条数据包含随机生成的文件编号、用户编号、文件类型、AI 标签(2~4 个,含标签名和置信度评分)、文件大小和创建时间。主键 _id 使用文件编号的 MD5 哈希值,打散数据以避免写入热点。BatchWriteRow 单次最多写入 200 行。

Java

private static void writeTestData(SyncClient client) throws Exception {
    Random rand = new Random(42);
    long start = System.currentTimeMillis();
    int written = 0;
    long baseTime = 1704067200000L; // 2024-01-01 UTC
    long timeSpan = 2L * 365 * 24 * 3600 * 1000; // 2 年

    String[] TAGS = {"风景", "人物", "建筑", "美食", "动物", "植物", "交通", "运动", "表格存储", "科技"};
    String[] TYPES = {"image", "video", "document", "audio"};
    String[] USERS = {"u00001", "u00002", "u00003", "u00004", "u00005"};

    while (written < 10000) {
        BatchWriteRowRequest batch = new BatchWriteRowRequest();
        int batchSize = Math.min(200, 10000 - written);

        for (int i = 0; i < batchSize; i++) {
            int idx = written + i;
            String fId = String.format("f%08d", idx);
            String id = md5(fId);
            String userId = USERS[rand.nextInt(USERS.length)];
            String type = TYPES[rand.nextInt(TYPES.length)];
            long size = 1024 + rand.nextInt(10 * 1024 * 1024);
            long createdAt = baseTime + (long)(rand.nextDouble() * timeSpan);
            String url = "oss://bucket/files/" + fId + "." + type;

            // 生成 2~4 个随机标签及评分
            int tagCount = 2 + rand.nextInt(3);
            Set<Integer> usedIndices = new HashSet<>();
            StringBuilder tagsJson = new StringBuilder("[");
            for (int t = 0; t < tagCount; t++) {
                int tagIdx;
                do { tagIdx = rand.nextInt(TAGS.length); } while (usedIndices.contains(tagIdx));
                usedIndices.add(tagIdx);
                long score = 50 + rand.nextInt(50);
                if (t > 0) tagsJson.append(",");
                tagsJson.append("{\"tag\":\"").append(TAGS[tagIdx])
                        .append("\",\"score\":").append(score).append("}");
            }
            tagsJson.append("]");

            PrimaryKeyBuilder pkb = PrimaryKeyBuilder.createPrimaryKeyBuilder();
            pkb.addPrimaryKeyColumn("_id", PrimaryKeyValue.fromString(id));
            RowPutChange row = new RowPutChange("meta_file", pkb.build());
            row.addColumn("fId", ColumnValue.fromString(fId));
            row.addColumn("userId", ColumnValue.fromString(userId));
            row.addColumn("tags", ColumnValue.fromString(tagsJson.toString()));
            row.addColumn("type", ColumnValue.fromString(type));
            row.addColumn("size", ColumnValue.fromLong(size));
            row.addColumn("createdAt", ColumnValue.fromLong(createdAt));
            row.addColumn("url", ColumnValue.fromString(url));
            batch.addRowChange(row);
        }
        client.batchWriteRow(batch);
        written += batchSize;
    }
    System.out.println("Written 10000 rows in " + (System.currentTimeMillis() - start) + "ms");
}

// MD5 工具方法
private static String md5(String input) throws Exception {
    MessageDigest md = MessageDigest.getInstance("MD5");
    byte[] digest = md.digest(input.getBytes("UTF-8"));
    StringBuilder sb = new StringBuilder();
    for (byte b : digest) sb.append(String.format("%02x", b));
    return sb.toString();
}

Python

import hashlib, json, random

def write_test_data(client):
    rand = random.Random(42)
    base_time = 1704067200000  # 2024-01-01 UTC
    time_span = 2 * 365 * 24 * 3600 * 1000  # 2 年

    TAGS = ["风景", "人物", "建筑", "美食", "动物", "植物", "交通", "运动", "表格存储", "科技"]
    TYPES = ["image", "video", "document", "audio"]
    USERS = ["u00001", "u00002", "u00003", "u00004", "u00005"]
    written = 0

    while written < 10000:
        batch_size = min(200, 10000 - written)
        put_rows = []

        for i in range(batch_size):
            idx = written + i
            f_id = f"f{idx:08d}"
            _id = hashlib.md5(f_id.encode()).hexdigest()
            user_id = rand.choice(USERS)
            file_type = rand.choice(TYPES)
            size = 1024 + rand.randint(0, 10 * 1024 * 1024)
            created_at = base_time + int(rand.random() * time_span)
            url = f"oss://bucket/files/{f_id}.{file_type}"

            # 生成 2~4 个随机标签及评分
            tag_count = 2 + rand.randint(0, 2)
            tag_indices = rand.sample(range(len(TAGS)), tag_count)
            tags_list = [{"tag": TAGS[ti], "score": 50 + rand.randint(0, 49)}
                         for ti in tag_indices]
            tags_json = json.dumps(tags_list, ensure_ascii=False)

            pk = [("_id", _id)]
            cols = [("fId", f_id), ("userId", user_id), ("tags", tags_json),
                    ("type", file_type), ("size", size),
                    ("createdAt", created_at), ("url", url)]
            row = tablestore.Row(pk, cols)
            put_rows.append(tablestore.PutRowItem(
                row, tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)))

        request = tablestore.BatchWriteRowRequest()
        request.add(tablestore.TableInBatchWriteRowItem("meta_file", put_rows))
        client.batch_write_row(request)
        written += batch_size

步骤三:检索数据

数据写入后,等待多元索引数据同步完成。以下示例演示三种典型检索场景。

场景一:按标签查询文件

使用 NestedQuery 查询包含指定标签的所有文件,按创建时间倒序排列。NestedQuery 用于在嵌套类型字段中执行子查询,需指定嵌套字段的路径。

Java

// 查询包含标签 "表格存储" 的所有文件
TermQuery tagQuery = new TermQuery();
tagQuery.setFieldName("tags.tag");
tagQuery.setTerm(ColumnValue.fromString("表格存储"));

NestedQuery nestedQuery = new NestedQuery();
nestedQuery.setPath("tags");
nestedQuery.setScoreMode(ScoreMode.Avg);
nestedQuery.setQuery(tagQuery);

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(nestedQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
    new FieldSort("createdAt", SortOrder.DESC))));

SearchRequest request = new SearchRequest("meta_file", "meta_file_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);

SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
    System.out.println(row);
}

Python

# 查询包含标签 "表格存储" 的所有文件
nested_query = tablestore.NestedQuery(
    "tags",
    tablestore.TermQuery("tags.tag", "表格存储"),
    score_mode=tablestore.ScoreMode.AVG,
)

result = client.search("meta_file", "meta_file_index",
    tablestore.SearchQuery(nested_query, get_total_count=True, limit=5,
        sort=tablestore.Sort(sorters=[
            tablestore.FieldSort("createdAt", tablestore.SortOrder.DESC)])),
    tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))

print(f"Total hits: {result.total_count}")
for row in result.rows:
    cols = {c[0]: c[1] for c in row[1]}
    print(f"fId={cols['fId']} | type={cols['type']} | tags={cols['tags']}")

场景二:多条件组合查询

使用 BoolQuery 组合 NestedQuery(标签精确匹配)、RangeQuery(时间范围)和 TermQuery(文件类型筛选),实现多维度组合检索。例如查询 2024 年上传的、包含标签"表格存储"的所有图片文件。

Java

// 嵌套标签查询:tags.tag = "表格存储"
TermQuery tagQuery = new TermQuery();
tagQuery.setFieldName("tags.tag");
tagQuery.setTerm(ColumnValue.fromString("表格存储"));

NestedQuery nestedQuery = new NestedQuery();
nestedQuery.setPath("tags");
nestedQuery.setScoreMode(ScoreMode.Avg);
nestedQuery.setQuery(tagQuery);

// 时间范围查询:2024 年全年
RangeQuery timeQuery = new RangeQuery();
timeQuery.setFieldName("createdAt");
timeQuery.greaterThanOrEqual(ColumnValue.fromLong(1704067200000L)); // 2024-01-01
timeQuery.lessThan(ColumnValue.fromLong(1735689600000L));           // 2025-01-01

// 文件类型查询:type = "image"
TermQuery typeQuery = new TermQuery();
typeQuery.setFieldName("type");
typeQuery.setTerm(ColumnValue.fromString("image"));

// 使用 BoolQuery 组合三个条件
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(nestedQuery, timeQuery, typeQuery));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setGetTotalCount(true);
searchQuery.setLimit(5);
searchQuery.setSort(new Sort(Arrays.asList(
    new FieldSort("createdAt", SortOrder.DESC))));

SearchRequest request = new SearchRequest("meta_file", "meta_file_index", searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setReturnAll(true);
request.setColumnsToGet(columnsToGet);

SearchResponse response = client.search(request);
System.out.println("Total hits: " + response.getTotalCount());
for (Row row : response.getRows()) {
    System.out.println(row);
}

Python

# 嵌套标签查询
nested_query = tablestore.NestedQuery(
    "tags",
    tablestore.TermQuery("tags.tag", "表格存储"),
    score_mode=tablestore.ScoreMode.AVG,
)
# 时间范围查询:2024 年全年
range_query = tablestore.RangeQuery(
    "createdAt",
    range_from=1704067200000,   # 2024-01-01
    range_to=1735689600000,     # 2025-01-01
    include_lower=True,
    include_upper=False,
)
# 文件类型查询
type_query = tablestore.TermQuery("type", "image")

# BoolQuery 组合三个条件
bool_query = tablestore.BoolQuery(
    must_queries=[nested_query, range_query, type_query])

result = client.search("meta_file", "meta_file_index",
    tablestore.SearchQuery(bool_query, get_total_count=True, limit=5,
        sort=tablestore.Sort(sorters=[
            tablestore.FieldSort("createdAt", tablestore.SortOrder.DESC)])),
    tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.ALL))

print(f"Total hits: {result.total_count}")
for row in result.rows:
    cols = {c[0]: c[1] for c in row[1]}
    print(f"fId={cols['fId']} | type={cols['type']} | createdAt={cols['createdAt']}")

场景三:按文件类型统计

使用 GroupByField 按文件类型分组,统计每种类型的文件数量和平均文件大小。

Java

SearchRequest request = SearchRequest.newBuilder()
    .tableName("meta_file").indexName("meta_file_index")
    .searchQuery(SearchQuery.newBuilder()
        .query(QueryBuilders.matchAll())
        .limit(0)
        .addGroupBy(GroupByBuilders.groupByField("fileType", "type")
            .size(20)
            .addSubAggregation(AggregationBuilders.count("fileCount", "fId"))
            .addSubAggregation(AggregationBuilders.avg("avgSize", "size"))
            .addGroupBySorter(GroupBySorter.subAggSortInDesc("fileCount"))
            .build())
        .build())
    .build();

SearchResponse response = client.search(request);
GroupByFieldResult results = response.getGroupByResults()
    .getAsGroupByFieldResult("fileType");
for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {
    long count = item.getSubAggregationResults()
        .getAsCountAggregationResult("fileCount").getValue();
    double avgSize = item.getSubAggregationResults()
        .getAsAvgAggregationResult("avgSize").getValue();
    System.out.println("类型: " + item.getKey()
        + " | 文件数: " + count
        + " | 平均大小: " + String.format("%.0f", avgSize) + " bytes");
}

Python

group_by = tablestore.GroupByField("type", size=20,
    sub_aggs=[
        tablestore.Count("fId", name="fileCount"),
        tablestore.Avg("size", name="avgSize"),
    ],
    group_by_sort=[tablestore.SubAggSort(tablestore.SortOrder.DESC, "fileCount")],
    name="fileType")

result = client.search("meta_file", "meta_file_index",
    tablestore.SearchQuery(tablestore.MatchAllQuery(), limit=0,
                           group_bys=[group_by]),
    tablestore.ColumnsToGet(return_type=tablestore.ColumnReturnType.NONE))

for item in result.group_by_results[0].items:
    aggs = {a.name: a.value for a in item.sub_aggs}
    print(f"类型: {item.key} | 文件数: {int(aggs['fileCount'])}"
          f" | 平均大小: {aggs['avgSize']:.0f} bytes")

资源清理

说明

不再需要本方案中创建的资源时,按以下步骤清理,避免产生不必要的费用。

清理顺序:先删除多元索引,再删除数据表。如果实例是为本方案新建的,最后释放实例。

Java

private static void cleanup(SyncClient client) {
    // 1. 删除多元索引
    DeleteSearchIndexRequest deleteIndexReq = new DeleteSearchIndexRequest();
    deleteIndexReq.setTableName("meta_file");
    deleteIndexReq.setIndexName("meta_file_index");
    client.deleteSearchIndex(deleteIndexReq);

    // 2. 删除数据表
    client.deleteTable(new DeleteTableRequest("meta_file"));
}

Python

def cleanup(client):
    # 1. 删除多元索引
    client.delete_search_index("meta_file", "meta_file_index")
    # 2. 删除数据表
    client.delete_table("meta_file")