使用Tablestore存储和查询OSS文件元数据

更新时间:
复制为 MD 格式

对象存储 OSS 提供 Bucket 清单功能,可定期生成 Bucket 内所有对象的元数据快照,包括对象路径、大小、存储类型、最后修改时间、加密状态等信息。本文介绍如何利用 OSS Bucket 清单和函数计算(FC),将 OSS 文件元数据自动写入表格存储(Tablestore),实现对大规模对象存储的元数据管理与查询。

方案架构

整体数据流如下:

  1. OSS Bucket 清单:按照配置的周期(每天或每周)自动扫描 Bucket 内的对象,将元数据清单写入指定的报告 Bucket,并生成一个 manifest.json 索引文件。

  2. 函数计算(FC):配置 OSS 触发器监听报告 Bucket,当新的 manifest.json 写入时自动触发 FC 函数。FC 函数读取 manifest.json,下载并解析元数据清单,将对象元数据批量写入 Tablestore。

  3. Tablestore:作为元数据的持久化存储,利用多元索引支持按存储类型、目录前缀、文件大小、修改时间等维度灵活查询和聚合统计。

准备工作

在开始操作前,需要完成以下资源准备。

  1. OSS Bucket:已创建用于存放清单报告的目标 Bucket(可与生成清单的 Bucket 相同)。

  2. Tablestore 实例:已在 Tablestore 控制台创建实例,并记录实例名称和访问地址(Endpoint)。

  3. Python 环境:本地已安装 Python 3.8 或以上版本,用于运行初始化脚本。可通过 python3 --version 确认当前版本。

步骤一:开启 OSS Bucket 清单

参考 OSS 文档存储空间清单,在源 Bucket 上创建清单规则。本文以如下示例配置为参考:

配置项

示例值

说明

规则名称

inventory-test

清单规则名称,在FC触发器前缀中会用到

当前 Bucket

metadata-fc-test

需要采集元数据的 Bucket

清单报告存储至

metadata-fc-test

存放清单报告的 Bucket,可与源 Bucket 相同。Bucket路径为空,表示保存在Bucket根目录下

清单文件扫描范围按前缀匹配

photograph

仅统计指定前缀下的对象

清单报告导出周期导出频率

每天

清单生成周期

按以上配置生成的清单报告会存放在:

oss://metadata-fc-test/metadata-fc-test/inventory-test/{日期}/manifest.json
oss://metadata-fc-test/metadata-fc-test/inventory-test/data/*.csv.gz

其中 manifest.json 按生成日期存放在以时间戳命名的子目录中,实际的 CSV 数据文件统一存放在 data/ 目录下。

说明

清单字段中,Bucket名称Object 名称为必选字段,其余字段(文件类型、Object 大小、最后更新日期、ETag等)按需勾选。导入脚本会根据 manifest.json 中的 fileSchema 自动识别实际包含的字段,仅将存在的字段写入 Tablestore,不会写入空值占位。

步骤二:初始化 Tablestore

在正式导入前,需要在 Tablestore 实例中创建数据表、多元索引和 SQL 映射表。将以下脚本保存为 init_ots.py 并执行。

脚本的具体功能如下:

  • 数据表:以 key(对象路径)为主键,TTL 设置为永久保留(清单为全量快照,每次导入覆盖写入)。

  • 多元索引:为 Bucket名称、文件类型、存储类型、Object 大小、最后更新日期等字段建立索引,支持多维度查询和聚合统计。

  • SQL 映射表:为数据表创建 SQL 语义映射,便于通过标准 SQL 查询元数据。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
OTS 初始化脚本(OSS Inventory 版)
用于创建数据表、多元索引和 SQL 映射表
"""

import sys
import argparse
from tablestore import OTSClient, TableMeta, TableOptions, ReservedThroughput, CapacityUnit
from tablestore import FieldSchema, FieldType, SearchIndexMeta, IndexSetting, AnalyzerType, SplitAnalyzerParameter


def create_table(client, table_name):
    print(f"[1/3] 创建数据表: {table_name}")
    try:
        schema_of_primary_key = [('key', 'STRING')]
        table_meta = TableMeta(table_name, schema_of_primary_key)
        table_options = TableOptions(
            time_to_live=-1,
            max_version=1,
            allow_update=False
        )
        reserved_throughput = ReservedThroughput(CapacityUnit(0, 0))
        client.create_table(table_meta, table_options, reserved_throughput)
        print(f"✓ 数据表创建成功: {table_name}")
        return True
    except Exception as e:
        print(f"✗ 数据表创建失败: {e}")
        return False


def create_search_index(client, table_name, index_name):
    print(f"[2/3] 创建多元索引: {index_name}")
    try:
        fields = [
            # key: 路径分词,支持按路径段检索
            FieldSchema('key', FieldType.TEXT, index=True,
                        analyzer=AnalyzerType.SPLIT,
                        analyzer_parameter=SplitAnalyzerParameter("/")),
            # prefix: 目录前缀,用于按目录聚合
            FieldSchema('prefix', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # bucket
            FieldSchema('bucket', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # storage_class: Standard / IA / Archive / ...
            FieldSchema('storage_class', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # object_type: Normal / Appendable / Multipart / Symlink
            FieldSchema('object_type', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # object_acl: default / private / public-read / public-read-write
            FieldSchema('object_acl', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # encryption_status
            FieldSchema('encryption_status', FieldType.BOOLEAN, index=True),
            # is_multipart_uploaded
            FieldSchema('is_multipart_uploaded', FieldType.BOOLEAN, index=True),
            # size: 字节数
            FieldSchema('size', FieldType.LONG, index=True, enable_sort_and_agg=True),
            # last_modified_ms: 最后修改时间,epoch 毫秒
            FieldSchema('last_modified_ms', FieldType.LONG, index=True, enable_sort_and_agg=True),
        ]
        index_setting = IndexSetting(routing_fields=['key'])
        index_meta = SearchIndexMeta(fields, index_setting=index_setting, index_sort=None, time_to_live=-1)
        client.create_search_index(table_name, index_name, index_meta)
        print(f"✓ 多元索引创建成功: {index_name}")
        return True
    except Exception as e:
        print(f"✗ 多元索引创建失败: {e}")
        return False


def create_mapping_table(client, table_name):
    print(f"[3/3] 创建 SQL 映射表")
    try:
        sql = f"""CREATE TABLE `{table_name}` (
            `key` VARCHAR(1024),
            `bucket` MEDIUMTEXT,
            `prefix` MEDIUMTEXT,
            `crc64` MEDIUMTEXT,
            `object_type` MEDIUMTEXT,
            `object_acl` MEDIUMTEXT,
            `encryption_status` BOOL,
            `is_multipart_uploaded` BOOL,
            `etag` MEDIUMTEXT,
            `last_modified_ms` BIGINT(20),
            `storage_class` MEDIUMTEXT,
            `size` BIGINT(20),
            PRIMARY KEY(`key`)
        )"""
        client.exe_sql_query(sql)
        print(f"✓ SQL 映射表创建成功")
        return True
    except Exception as e:
        print(f"✗ SQL 映射表创建失败: {e}")
        return False


def main():
    parser = argparse.ArgumentParser(description='初始化 OTS 数据表、多元索引和 SQL 映射表')
    parser.add_argument('--endpoint', required=True, help='OTS 实例访问地址')
    parser.add_argument('--instance-name', required=True, help='OTS 实例名称')
    parser.add_argument('--access-key-id', required=True, help='Access Key ID')
    parser.add_argument('--access-key-secret', required=True, help='Access Key Secret')
    parser.add_argument('--table-name', default='oss_inventory', help='数据表名称(默认: oss_inventory)')
    parser.add_argument('--search-index-name', default='oss_inventory_idx', help='多元索引名称(默认: oss_inventory_idx)')
    args = parser.parse_args()

    try:
        client = OTSClient(args.endpoint, args.access_key_id, args.access_key_secret, args.instance_name)
        if not create_table(client, args.table_name):
            sys.exit(1)
        if not create_search_index(client, args.table_name, args.search_index_name):
            sys.exit(1)
        if not create_mapping_table(client, args.table_name):
            sys.exit(1)
        print("\n✓ 初始化完成")
    except Exception as e:
        print(f"\n✗ 执行失败: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

安装Tablestore依赖:

pip3 install tablestore

执行初始化脚本:

python3 init_ots.py \
  --endpoint https://<instance>.<region>.ots.aliyuncs.com \
  --instance-name <instance> \
  --access-key-id <AccessKey ID> \
  --access-key-secret <AccessKey Secret> \
  --table-name oss_inventory \
  --search-index-name oss_inventory_idx

脚本会依次创建数据表 oss_inventory、多元索引 oss_inventory_idx 和 SQL 映射表,并打印每步的执行结果。

步骤三:创建并部署函数计算

3.1 创建函数计算函数

  1. 登录函数计算控制台,单击函数管理 > 函数列表,在页面上方选择创建清单 Bucket 所在的地域。

  2. 单击创建函数,选择事件函数,然后单击创建事件函数。填写以下配置项,其他可保持默认。

    配置项

    函数名称

    填写函数名称,如 oss-inventory-import

    运行环境

    选择内置运行时 > Python > Python 3.12

    代码上传方式

    选择通过 ZIP 包上传代码,并上传本文提供的部署包oss-inventory-fc.zip

  3. 单击创建

3.2 配置环境变量

  1. 函数详情页,单击编辑环境变量,选择使用 JSON 格式编辑,输入以下配置(替换为实际值):

    {
        "OSS_ACCESS_KEY_ID": "<AccessKey ID>",
        "OSS_ACCESS_KEY_SECRET": "<AccessKey Secret>",
        "OSS_REGION": "<region>",
        "OTS_ACCESS_KEY_ID": "<AccessKey ID>",
        "OTS_ACCESS_KEY_SECRET": "<AccessKey Secret>",
        "OTS_ENDPOINT": "https://<instance>.<region>.ots.aliyuncs.com",
        "OTS_INSTANCE_NAME": "<instance>",
        "OTS_TABLE_NAME": "oss_inventory"
    }
    说明

    OTS_TABLE_NAME 为可选项,不填时默认使用 oss_inventory,需与步骤二中创建的数据表名一致。

  2. 单击部署

3.3 配置 OSS 触发器

  1. 单击触发器 > 创建触发器,选择触发器类型为双向集成触发器类型下的对象存储 OSS,并配置以下内容:

    配置项

    说明

    Bucket 名称

    metadata-fc-test

    选择清单报告存储的 Bucket

    文件前缀

    metadata-fc-test/inventory-test/

    源 Bucket 名 + 规则 ID,对应清单报告的存放路径

    文件后缀

    manifest.json

    仅在清单索引文件写入时触发

    触发事件

    oss:ObjectCreated:PutObject

    仅当OSS请求类型为PutObject时触发函数调用

    说明

    文件前缀的格式为 {清单报告保存路径}/{规则名称}/,与清单报告的实际存放路径对应。本示例中源 Bucket 和报告 Bucket 相同,均为 metadata-fc-test,规则名称为 inventory-test,因此前缀为 metadata-fc-test/inventory-test/

  2. 单击确定

完成以上配置后,每次 OSS Bucket 清单生成新的报告时,函数计算会自动触发,将本次清单中的所有对象元数据写入 Tablestore。由于清单为全量快照,每次导入会覆盖已有数据,Tablestore 中始终保留最新一次清单的完整元数据。可通过函数计算日志查看调用记录信息,fileSchema取决于清单规则配置:

2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] [START] request_id=69B2C413CB5DF73035C614EF
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 事件: ObjectCreated:PutObject | oss://metadata-fc-test/metadata-fc-test/inventory-test/2026-03-12T13-47Z/manifest.json
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] OTS endpoint: https://<instance-name>.<region>.ots.aliyuncs.com | table: oss_inventory
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 下载: oss://metadata-fc-test/metadata-fc-test/inventory-test/2026-03-12T13-47Z/manifest.json
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] fileSchema: ['Bucket', 'Key']
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 共 1 个 CSV 文件
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 处理: metadata-fc-test/inventory-test/data/928ea691-b31a-43d2-a3cb-64cb9b60cc90.csv.gz
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 下载: oss://metadata-fc-test/metadata-fc-test/inventory-test/data/928ea691-b31a-43d2-a3cb-64cb9b60cc90.csv.gz
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 解析 968 行,开始写入 OTS...
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 写入完成: 968/968 行
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] [DONE] 共写入 968 行

使用限制

本方案示例代码仅支持全量清单。OSS 还提供增量清单功能(需申请),增量清单的目录结构和文件格式与全量清单不同,如需支持增量清单,需自行修改导入代码以适配其格式。

相关文档