Store and query OSS object metadata in Tablestore

更新时间:
复制 MD 格式

OSS Bucket Inventory periodically snapshots object metadata in a bucket — path, size, storage class, last-modified time, encryption status, and more. Combined with Function Compute (FC), it can automatically write this metadata to Tablestore for large-scale management and querying.

Solution architecture

The data flows through three services:

  1. OSS Bucket Inventory: Scans all objects in the bucket on a configured schedule (daily or weekly), writes the metadata to a designated report bucket, and generates a manifest.json index file.

  2. Function Compute (FC): An OSS trigger monitors the report bucket. When a new manifest.json is written, FC automatically invokes the function. The function reads manifest.json, downloads and parses the inventory files, and writes the object metadata to Tablestore in bulk.

  3. Tablestore: Stores object metadata with secondary indexes on fields such as storage class, directory prefix, object size, and last-modified time for multi-condition queries and aggregations.

Prerequisites

Ensure the following resources are ready:

  1. OSS bucket: A destination bucket for storing inventory reports (can be the same bucket that generates the inventory).

  2. Tablestore instance: An instance created in the Tablestore console, with the instance name and endpoint recorded.

  3. Python environment: Python 3.8 or later. Run python3 --version to verify your installation.

Step 1: Enable OSS Bucket Inventory

Create an inventory rule on the source bucket as described in the OSS Bucket Inventory documentation Bucket inventory. The following example shows a working configuration:

Parameter

Example value

Description

Rule Name

inventory-test

Inventory rule name; used in the FC trigger prefix

Current bucket

metadata-fc-test

The bucket whose object metadata you want to collect

Inventory Report Destination

metadata-fc-test

Bucket for storing inventory reports; can be the same as the source bucket. Leave the bucket path blank to save reports to the root directory.

Scan Scope(Object Prefix)

photograph

Limits the inventory to objects under the specified prefix

Frequency

Daily

Inventory generation frequency

With this configuration, inventory reports are stored at:

oss://metadata-fc-test/metadata-fc-test/inventory-test/{date}/manifest.json
oss://metadata-fc-test/metadata-fc-test/inventory-test/data/*.csv.gz

manifest.json is stored in a timestamp-named subdirectory. The actual CSV data files are stored under the data/ directory.

Note

In the inventory fields, Bucket Name and Object Name are required. All other fields (file type, object size, last-modified date, ETag, and so on) are optional. The import script reads the fileSchema field in manifest.json to determine which fields are present and writes only those fields to Tablestore. No placeholder entries are created for missing fields.

Step 2: Initialize Tablestore

Before importing data, create a table, a secondary index, and an SQL mapping table in your Tablestore instance. Save the following script as init_ots.py.

The script performs the following tasks:

  • Table: Uses key (object path) as the primary key, with TTL set to permanent retention. Because each inventory is a full snapshot, every import overwrites existing records.

  • Secondary index: Indexes fields such as bucket name, file type, storage class, object size, and last-modified date for multi-dimensional queries and aggregations.

  • SQL mapping table: Maps the table to SQL so you can query metadata with standard SQL statements.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
OTS initialization script (OSS Inventory version)
Used to create a data table, search index, and SQL mapping table
"""

import sys
import argparse
from tablestore import OTSClient, TableMeta, TableOptions, ReservedThroughput, CapacityUnit
from tablestore import FieldSchema, FieldType, SearchIndexMeta, IndexSetting, AnalyzerType, SplitAnalyzerParameter

def create_table(client, table_name):
    print(f"[1/3] Creating data table: {table_name}")
    try:
        schema_of_primary_key = [('key', 'STRING')]
        table_meta = TableMeta(table_name, schema_of_primary_key)
        table_options = TableOptions(
            time_to_live=-1,
            max_version=1,
            allow_update=False
        )
        reserved_throughput = ReservedThroughput(CapacityUnit(0, 0))
        client.create_table(table_meta, table_options, reserved_throughput)
        print(f"✓ Data table created successfully: {table_name}")
        return True
    except Exception as e:
        print(f"✗ Failed to create data table: {e}")
        return False

def create_search_index(client, table_name, index_name):
    print(f"[2/3] Creating search index: {index_name}")
    try:
        fields = [
            # key: path tokenization, supports retrieval by path segment
            FieldSchema('key', FieldType.TEXT, index=True,
                        analyzer=AnalyzerType.SPLIT,
                        analyzer_parameter=SplitAnalyzerParameter("/")),
            # prefix: directory prefix, used for aggregation by directory
            FieldSchema('prefix', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # bucket
            FieldSchema('bucket', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # storage_class: Standard / IA / Archive / ...
            FieldSchema('storage_class', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # object_type: Normal / Appendable / Multipart / Symlink
            FieldSchema('object_type', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # object_acl: default / private / public-read / public-read-write
            FieldSchema('object_acl', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
            # encryption_status
            FieldSchema('encryption_status', FieldType.BOOLEAN, index=True),
            # is_multipart_uploaded
            FieldSchema('is_multipart_uploaded', FieldType.BOOLEAN, index=True),
            # size: byte count
            FieldSchema('size', FieldType.LONG, index=True, enable_sort_and_agg=True),
            # last_modified_ms: last modified time, epoch milliseconds
            FieldSchema('last_modified_ms', FieldType.LONG, index=True, enable_sort_and_agg=True),
        ]
        index_setting = IndexSetting(routing_fields=['key'])
        index_meta = SearchIndexMeta(fields, index_setting=index_setting, index_sort=None, time_to_live=-1)
        client.create_search_index(table_name, index_name, index_meta)
        print(f"✓ Search index created successfully: {index_name}")
        return True
    except Exception as e:
        print(f"✗ Failed to create search index: {e}")
        return False

def create_mapping_table(client, table_name):
    print(f"[3/3] Creating SQL mapping table")
    try:
        sql = f"""CREATE TABLE `{table_name}` (
            `key` VARCHAR(1024),
            `bucket` MEDIUMTEXT,
            `prefix` MEDIUMTEXT,
            `crc64` MEDIUMTEXT,
            `object_type` MEDIUMTEXT,
            `object_acl` MEDIUMTEXT,
            `encryption_status` BOOL,
            `is_multipart_uploaded` BOOL,
            `etag` MEDIUMTEXT,
            `last_modified_ms` BIGINT(20),
            `storage_class` MEDIUMTEXT,
            `size` BIGINT(20),
            PRIMARY KEY(`key`)
        )"""
        client.exe_sql_query(sql)
        print(f"✓ SQL mapping table created successfully")
        return True
    except Exception as e:
        print(f"✗ Failed to create SQL mapping table: {e}")
        return False

def main():
    parser = argparse.ArgumentParser(description='Initialize OTS data table, search index, and SQL mapping table')
    parser.add_argument('--endpoint', required=True, help='OTS instance endpoint')
    parser.add_argument('--instance-name', required=True, help='OTS instance name')
    parser.add_argument('--access-key-id', required=True, help='Access Key ID')
    parser.add_argument('--access-key-secret', required=True, help='Access Key Secret')
    parser.add_argument('--table-name', default='oss_inventory', help='Data table name (default: oss_inventory)')
    parser.add_argument('--search-index-name', default='oss_inventory_idx', help='Search index name (default: oss_inventory_idx)')
    args = parser.parse_args()

    try:
        client = OTSClient(args.endpoint, args.access_key_id, args.access_key_secret, args.instance_name)
        if not create_table(client, args.table_name):
            sys.exit(1)
        if not create_search_index(client, args.table_name, args.search_index_name):
            sys.exit(1)
        if not create_mapping_table(client, args.table_name):
            sys.exit(1)
        print("\n✓ Initialization complete")
    except Exception as e:
        print(f"\n✗ Execution failed: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Install the Tablestore dependency:

pip3 install tablestore

Run the initialization script:

python3 init_ots.py \
  --endpoint https://<instance>.<region>.ots.aliyuncs.com \
  --instance-name <instance> \
  --access-key-id <AccessKey ID> \
  --access-key-secret <AccessKey Secret> \
  --table-name oss_inventory \
  --search-index-name oss_inventory_idx

The script creates the oss_inventory table, the oss_inventory_idx secondary index, and the SQL mapping table in sequence and prints the result of each step.

Step 3: Create and deploy a Function Compute function

3.1 Create the function

  1. Log in to the Function Compute console and click Function Management > Functions. Select the region where the inventory report bucket is located.

  2. Click Create Function, select Event Function, and then click Create Event Function. Fill in the following settings and leave the rest as defaults.

    Parameter

    Value

    Function Name

    Enter a function name, such as oss-inventory-import

    Runtime

    Select Built-in Runtimes > Python > Python 3.12

    Code Upload Method

    Select Upload ZIP and upload the deployment package oss-inventory-fc.zip provided in this guide

  3. Click Create.

3.2 Configure environment variables

  1. On the Function Details page, click Edit Environment Variables, select JSON Editor, and enter the following configuration (replace the placeholders with your actual values):

    {
        "OSS_ACCESS_KEY_ID": "<AccessKey ID>",
        "OSS_ACCESS_KEY_SECRET": "<AccessKey Secret>",
        "OSS_REGION": "<region>",
        "OTS_ACCESS_KEY_ID": "<AccessKey ID>",
        "OTS_ACCESS_KEY_SECRET": "<AccessKey Secret>",
        "OTS_ENDPOINT": "https://<instance>.<region>.ots.aliyuncs.com",
        "OTS_INSTANCE_NAME": "<instance>",
        "OTS_TABLE_NAME": "oss_inventory"
    }
    Note

    OTS_TABLE_NAME is optional. If omitted, it defaults to oss_inventory. This value must match the table name you created in Step 2.

  2. Click Deployment.

3.3 Configure the OSS trigger

  1. Click Trigger > Create Trigger. Set the trigger type to Two-way Integrated Trigger and select OSS, then configure the following settings:

    Parameter

    Value

    Description

    Bucket Name

    metadata-fc-test

    Select the bucket where inventory reports are stored

    Object Prefix

    metadata-fc-test/inventory-test/

    Source bucket name + rule ID, matching the inventory report storage path

    Object Suffix

    manifest.json

    Triggers only when the inventory index file is written

    Trigger Event

    oss:ObjectCreated:PutObject

    Triggers the function only when the OSS request type is PutObject

    Note

    The prefix format is {inventory report save path}/{rule name}/, matching the actual inventory report storage path. In this example, the source bucket and report bucket are both metadata-fc-test, and the rule name is inventory-test, so the prefix is metadata-fc-test/inventory-test/.

  2. Click OK.

Function Compute now triggers automatically each time OSS Bucket Inventory generates a new report, reads the inventory files, and writes the object metadata to Tablestore. Because each inventory is a full snapshot, every import overwrites existing records — Tablestore always holds the latest metadata. To verify invocations, check the Function Compute logs. The fileSchema value depends on your inventory rule configuration:

2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] [START] request_id=69B2C413CB5DF73035C614EF
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] event: ObjectCreated:PutObject | oss://metadata-fc-test/metadata-fc-test/inventory-test/2026-03-12T13-47Z/manifest.json
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] OTS endpoint: https://<instance-name>.<region>.ots.aliyuncs.com | table: oss_inventory
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] downloading: oss://metadata-fc-test/metadata-fc-test/inventory-test/2026-03-12T13-47Z/manifest.json
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] fileSchema: ['Bucket', 'Key']
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] 1 CSV file found
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] processing: metadata-fc-test/inventory-test/data/928ea691-b31a-43d2-a3cb-64cb9b60cc90.csv.gz
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] downloading: oss://metadata-fc-test/metadata-fc-test/inventory-test/data/928ea691-b31a-43d2-a3cb-64cb9b60cc90.csv.gz
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] parsed 968 rows, writing to OTS...
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] write complete: 968/968 rows
2026-03-12 21:48:07 69B2C413CB5DF73035C614EF [INFO] [DONE] 968 rows written in total

Limitations

The sample code supports full inventory only. OSS also provides incremental inventory (available on request), which uses a different directory structure and file format. To support incremental inventory, you must modify the import code accordingly.

What's next