基于Tablestore的多模态图片检索系统

基于 Tablestore 向量检索能力与阿里云百炼多模态 Embedding 模型,构建多模态图片检索系统。系统支持自然语言搜图以图搜图功能,适用于电商商品搜索、智能相册管理、媒体资产检索等场景。

方案概览

多模态图片检索系统构建流程包括以下核心步骤:

  1. 创建表和索引:创建Tablestore数据表存储图片数据,创建多元索引支持向量检索功能。

  2. 图片向量化处理:使用百炼多模态 Embedding 模型将图片转换为高维向量表示。

  3. 向量数据写入:将生成的图片向量数据及相关元数据批量存储至 Tablestore。

  4. 执行多模态检索:将查询图片或自然语言转换为向量,在多元索引中执行相似度搜索,支持通过元数据条件进行精准过滤。

2025-12-19_16-15-10 (1)

准备工作

开始构建检索系统前,需要完成开发环境配置、凭证设置和数据准备。

1. 安装 SDK

  1. 确保已安装 Python 3.12 及以上版本。

  2. 执行以下命令安装 Tablestore Python SDK 和阿里云百炼 SDK。

    pip install tablestore
    pip install dashscope
    pip install Pillow

2. 配置环境变量

将访问凭证配置为环境变量,确保代码安全性与跨环境可移植性。

配置前请先获取百炼平台的API KeyAccessKey,前往表格存储控制台创建实例并获取实例名称和访问地址。
说明

出于安全考虑,新创建的表格存储实例默认不开启公网访问,如需使用公网访问地址,请在实例的网络管理中设置允许公网访问。

export DASHSCOPE_API_KEY=<百炼平台的API KEY>
export tablestore_end_point=<Tablestore实例访问地址>
export tablestore_instance_name=<Tablestore实例名称>
export tablestore_access_key_id=<AccessKey ID>
export tablestore_access_key_secret=<AccessKey Secret>

3. 准备图片数据

支持使用自定义图片数据或教程提供的演示数据集。

git clone https://github.com/aliyun/alibabacloud-tablestore-ai-demo.git

也可直接下载演示项目文件:alibabacloud-tablestore-ai-demo-main

步骤一:创建表和索引

创建存储图片向量数据的数据表和支持向量检索的多元索引。根据业务需求和数据特点自定义表结构和索引配置。如需快速体验演示效果,可直接使用以下示例配置。

1. 创建数据表

# -*- coding: utf-8 -*-
"""
创建 Tablestore 数据表
"""

import os
import tablestore


def get_client():
    """创建 Tablestore 客户端"""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def create_table(client, table_name):
    """创建数据表"""
    try:
        # 主键定义
        schema_of_primary_key = [("image_id", "STRING")]
        table_meta = tablestore.TableMeta(table_name, schema_of_primary_key)
        table_options = tablestore.TableOptions()
        reserved_throughput = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
        client.create_table(table_meta, table_options, reserved_throughput)
        print(f"表 '{table_name}' 创建成功")
    except tablestore.OTSClientError as e:
        print(f"创建表失败 (客户端错误): {e}")
    except tablestore.OTSServiceError as e:
        if "already exist" in str(e).lower() or "OTSObjectAlreadyExist" in str(e):
            print(f"表 '{table_name}' 已存在,跳过创建")
        else:
            print(f"创建表失败 (服务端错误): {e}")


def main():
    # 配置参数
    table_name = "multi_modal_retrieval"

    print("=" * 50)
    print("创建 Tablestore 数据表")
    print("=" * 50)

    # 创建客户端
    client = get_client()
    print("Tablestore 客户端创建成功")

    # 创建表
    print("\n创建数据表...")
    create_table(client, table_name)

    print("\n" + "=" * 50)
    print("数据表创建完成!")
    print("=" * 50)


if __name__ == "__main__":
    main()

2. 创建多元索引

向量数据在 Tablestore 数据表中以字符串格式存储。要启用向量检索功能,必须创建多元索引并配置向量字段类型,以支持高维向量的相似度计算和快速检索。

# -*- coding: utf-8 -*-
"""
创建 Tablestore 多元索引
"""

import os
import tablestore


def get_client():
    """创建 Tablestore 客户端"""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def create_search_index(client, table_name, index_name, dimension=1024):
    """创建多元索引"""
    try:
        # 索引字段定义
        metadata_mappings = [
            tablestore.FieldSchema(
                "image_id",
                tablestore.FieldType.KEYWORD,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "city",
                tablestore.FieldType.KEYWORD,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "height",
                tablestore.FieldType.LONG,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "width",
                tablestore.FieldType.LONG,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "vector",
                tablestore.FieldType.VECTOR,
                vector_options=tablestore.VectorOptions(
                    data_type=tablestore.VectorDataType.VD_FLOAT_32,
                    dimension=dimension,
                    metric_type=tablestore.VectorMetricType.VM_COSINE,
                ),
            ),
        ]

        index_meta = tablestore.SearchIndexMeta(metadata_mappings)
        client.create_search_index(table_name, index_name, index_meta)
        print(f"多元索引 '{index_name}' 创建成功")
    except tablestore.OTSClientError as e:
        print(f"创建索引失败 (客户端错误): {e}")
    except tablestore.OTSServiceError as e:
        if "already exist" in str(e).lower() or "OTSObjectAlreadyExist" in str(e):
            print(f"多元索引 '{index_name}' 已存在,跳过创建")
        else:
            print(f"创建索引失败 (服务端错误): {e}")


def main():
    # 配置参数
    table_name = "multi_modal_retrieval"
    index_name = "index"
    dimension = 1024

    print("=" * 50)
    print("创建 Tablestore 多元索引")
    print("=" * 50)

    # 创建客户端
    client = get_client()
    print("Tablestore 客户端创建成功")

    # 创建索引
    print("\n创建多元索引...")
    create_search_index(client, table_name, index_name, dimension)

    print("\n" + "=" * 50)
    print("多元索引创建完成!")
    print("=" * 50)


if __name__ == "__main__":
    main()

步骤二:图片向量化处理

调用阿里云百炼多模态向量化模型对图片进行向量化处理。以下示例演示本地图片向量化方法,更多使用方式请参见多模态向量

大量图片向量化处理耗时较长,演示项目提供预处理的向量数据文件data.json,可在步骤三中直接使用。
# -*- coding: utf-8 -*-
"""
本地图片向量化演示
展示如何使用百炼多模态向量化模型对本地图片进行向量化
输出原始图片信息、向量维度、向量的前几个元素等关键信息
"""

import base64
import os
from pathlib import Path

import dashscope
from PIL import Image


def image_to_base64(image_path):
    """将图片文件转换为 base64 编码"""
    with open(image_path, "rb") as f:
        image_data = f.read()
    return base64.b64encode(image_data).decode("utf-8")


def get_image_embedding(image_path):
    """
    调用百炼多模态向量化模型,以本地图片方式进行向量化
    """
    # 将本地图片转换为 base64
    base64_image = image_to_base64(image_path)

    # 获取图片格式
    suffix = Path(image_path).suffix.lower()
    if suffix in [".jpg", ".jpeg"]:
        mime_type = "image/jpeg"
    elif suffix == ".png":
        mime_type = "image/png"
    elif suffix == ".gif":
        mime_type = "image/gif"
    elif suffix == ".webp":
        mime_type = "image/webp"
    else:
        mime_type = "image/jpeg"  # 默认使用 jpeg

    # 构造 data URI
    data_uri = f"data:{mime_type};base64,{base64_image}"

    # 调用多模态向量化 API
    resp = dashscope.MultiModalEmbedding.call(
        model="multimodal-embedding-v1",
        input=[{"image": data_uri, "factor": 1.0}]
    )

    if resp.status_code == 200:
        return resp.output["embeddings"][0]["embedding"]
    else:
        raise Exception(f"向量化失败: {resp.code} - {resp.message}")


def get_image_info(image_path):
    """获取图片基本信息"""
    with Image.open(image_path) as img:
        return {
            "filename": os.path.basename(image_path),
            "format": img.format,
            "mode": img.mode,
            "width": img.width,
            "height": img.height,
            "size_bytes": os.path.getsize(image_path),
        }


def main():
    # 路径配置
    current_dir = Path(__file__).parent
    project_root = current_dir
    image_dir = project_root / "data" / "photograph"

    print("=" * 60)
    print("本地图片向量化演示")
    print("=" * 60)

    # 获取图片列表
    image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp'))]
    
    if not image_files:
        print("未找到图片文件")
        return

    # 选择第一张图片进行演示
    demo_image = image_files[0]
    image_path = image_dir / demo_image

    print(f"\n[1/3] 读取图片信息")
    print("-" * 60)
    
    # 获取图片信息
    image_info = get_image_info(image_path)
    print(f"文件名: {image_info['filename']}")
    print(f"格式: {image_info['format']}")
    print(f"模式: {image_info['mode']}")
    print(f"宽度: {image_info['width']} px")
    print(f"高度: {image_info['height']} px")
    print(f"文件大小: {image_info['size_bytes']:,} bytes")

    print(f"\n[2/3] 调用向量化 API")
    print("-" * 60)
    print("正在调用百炼多模态向量化模型...")
    
    # 向量化
    vector = get_image_embedding(str(image_path))

    print(f"\n[3/3] 向量化结果")
    print("-" * 60)
    print(f"向量维度: {len(vector)}")
    print(f"向量类型: {type(vector[0]).__name__}")
    print(f"向量前10个元素:")
    for i, v in enumerate(vector[:10]):
        print(f"  [{i}] {v:.8f}")
    print("  ...")
    print(f"向量后5个元素:")
    for i, v in enumerate(vector[-5:], start=len(vector)-5):
        print(f"  [{i}] {v:.8f}")

    # 计算向量范数
    import math
    norm = math.sqrt(sum(v * v for v in vector))
    print(f"\n向量L2范数: {norm:.8f}")

    print("\n" + "=" * 60)
    print("向量化演示完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

步骤三:向量数据写入

批量导入图片向量数据至 Tablestore 数据表。以下示例直接读取演示项目中预处理的向量数据进行批量写入。如使用自定义业务数据,可将图片向量化处理与数据写入操作结合执行。

# -*- coding: utf-8 -*-
"""
从 data.json 读取数据并写入 Tablestore
直接使用 data.json 中已有的向量数据,无需重新调用向量化 API
"""

import json
import os
from pathlib import Path

import tablestore


def get_client():
    """创建 Tablestore 客户端"""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def load_data(data_json_path):
    """从 data.json 加载全部数据"""
    with open(data_json_path, "r", encoding="utf-8") as f:
        return json.load(f)


def batch_write_rows(client, table_name, put_row_items):
    """批量写入数据到 Tablestore"""
    request = tablestore.BatchWriteRowRequest()
    request.add(tablestore.TableInBatchWriteRowItem(table_name, put_row_items))
    result = client.batch_write_row(request)
    return result.is_all_succeed()


def main():
    # 配置参数
    table_name = "multi_modal_retrieval"
    batch_size = 100  # 每批写入数量

    # 路径配置
    current_dir = Path(__file__).parent
    project_root = current_dir
    data_json_path = project_root / "data" / "data.json"

    print("=" * 60)
    print("从 data.json 读取数据并写入 Tablestore")
    print("=" * 60)

    # 创建 Tablestore 客户端
    client = get_client()
    print("Tablestore 客户端创建成功")

    # 加载数据
    print(f"\n[1/2] 加载数据: {data_json_path}")
    data_array = load_data(data_json_path)
    print(f"已加载 {len(data_array)} 条数据记录")

    # 写入数据
    print(f"\n[2/2] 开始写入数据...")
    put_row_items = []
    success_count = 0
    error_count = 0
    total = len(data_array)

    for idx, item in enumerate(data_array):
        try:
            image_id = item["image_id"]
            vector = item["vector"]
            city = item.get("city", "unknown")
            width = item.get("width", 0)
            height = item.get("height", 0)

            # 构造行数据
            primary_key = [("image_id", image_id)]
            attribute_columns = [
                ("city", city),
                ("vector", json.dumps(vector)),
                ("width", width),
                ("height", height),
            ]
            row = tablestore.Row(primary_key, attribute_columns)
            condition = tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)
            item_row = tablestore.PutRowItem(row, condition)
            put_row_items.append(item_row)

            # 批量写入
            if len(put_row_items) >= batch_size or (idx == total - 1 and len(put_row_items) > 0):
                is_success = batch_write_rows(client, table_name, put_row_items)
                if is_success:
                    success_count += len(put_row_items)
                    print(f"进度: {idx + 1}/{total} - 批量写入 {len(put_row_items)} 行成功")
                else:
                    error_count += len(put_row_items)
                    print(f"进度: {idx + 1}/{total} - 批量写入 {len(put_row_items)} 行失败")
                put_row_items = []

        except Exception as e:
            error_count += 1
            print(f"处理数据 {item.get('image_id', 'unknown')} 失败: {e}")
            continue

    print("\n" + "=" * 60)
    print(f"数据写入完成!")
    print(f"成功: {success_count} 条")
    print(f"失败: {error_count} 条")
    print("=" * 60)


if __name__ == "__main__":
    main()

步骤四:执行多模态检索

多模态图片检索系统支持两种检索模式:自然语言搜图以图搜图。系统将查询内容转换为向量表示,在向量索引中执行相似度计算,返回语义最匹配的图片结果,同时支持结合元数据条件(如城市、图片尺寸等)进行精准过滤。

自然语言检索

# -*- coding: utf-8 -*-
"""
语义检索示例
包含多种查询场景:
1. 仅使用查询文本进行语义检索
2. 使用查询文本 + 过滤条件(城市、高度、宽度)
"""

import os

import dashscope
import tablestore
from dashscope import MultiModalEmbeddingItemText


def get_client():
    """创建 Tablestore 客户端"""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def text_to_embedding(text: str) -> list[float]:
    """将文本转换为向量"""
    resp = dashscope.MultiModalEmbedding.call(
        model="multimodal-embedding-v1",
        input=[MultiModalEmbeddingItemText(text=text, factor=1.0)]
    )
    if resp.status_code == 200:
        return resp.output["embeddings"][0]["embedding"]
    else:
        raise Exception(f"文本向量化失败: {resp.code} - {resp.message}")


def search_by_text_only(client, table_name, index_name, query_text: str, top_k: int = 10):
    """
    场景1: 仅使用查询文本进行语义检索
    """
    print(f"\n{'='*60}")
    print(f"场景1: 仅使用查询文本检索")
    print(f"查询文本: '{query_text}'")
    print(f"返回数量: {top_k}")
    print("="*60)

    # 文本向量化
    query_vector = text_to_embedding(query_text)

    # 构建向量查询
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
    )

    # 按分数排序
    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(
        query,
        limit=top_k,
        get_total_count=False,
        sort=sort
    )

    # 执行搜索
    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\n检索结果:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_with_city_filter(client, table_name, index_name, query_text: str, city: str, top_k: int = 10):
    """
    场景2: 使用查询文本 + 城市过滤条件
    """
    print(f"\n{'='*60}")
    print(f"场景2: 查询文本 + 城市过滤")
    print(f"查询文本: '{query_text}'")
    print(f"城市过滤: {city}")
    print(f"返回数量: {top_k}")
    print("="*60)

    query_vector = text_to_embedding(query_text)

    # 构建带城市过滤的向量查询
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=tablestore.TermQuery(field_name='city', column_value=city)
    )

    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\n检索结果:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_with_size_filter(client, table_name, index_name, query_text: str,
                             height_range: tuple = None, width_range: tuple = None, top_k: int = 10):
    """
    场景3: 使用查询文本 + 尺寸过滤条件(高度、宽度)
    """
    print(f"\n{'='*60}")
    print(f"场景3: 查询文本 + 尺寸过滤")
    print(f"查询文本: '{query_text}'")
    print(f"高度范围: {height_range}")
    print(f"宽度范围: {width_range}")
    print(f"返回数量: {top_k}")
    print("="*60)

    query_vector = text_to_embedding(query_text)

    # 构建过滤条件
    must_queries = []
    if height_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='height',
            range_from=height_range[0],
            range_to=height_range[1],
            include_lower=True,
            include_upper=True
        ))
    if width_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='width',
            range_from=width_range[0],
            range_to=width_range[1],
            include_lower=True,
            include_upper=True
        ))

    vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None

    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=vector_filter
    )

    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\n检索结果:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_with_combined_filters(client, table_name, index_name, query_text: str,
                                  cities: list = None, height_range: tuple = None,
                                  width_range: tuple = None, top_k: int = 10):
    """
    场景4: 使用查询文本 + 组合过滤条件(城市列表、高度、宽度)
    """
    print(f"\n{'='*60}")
    print(f"场景4: 查询文本 + 组合过滤条件")
    print(f"查询文本: '{query_text}'")
    print(f"城市列表: {cities}")
    print(f"高度范围: {height_range}")
    print(f"宽度范围: {width_range}")
    print(f"返回数量: {top_k}")
    print("="*60)

    query_vector = text_to_embedding(query_text)

    # 构建组合过滤条件
    must_queries = []

    if cities and len(cities) > 0:
        must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))

    if height_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='height',
            range_from=height_range[0],
            range_to=height_range[1],
            include_lower=True,
            include_upper=True
        ))

    if width_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='width',
            range_from=width_range[0],
            range_to=width_range[1],
            include_lower=True,
            include_upper=True
        ))

    vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None

    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=vector_filter
    )

    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\n检索结果:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def parse_search_hit(hit):
    """解析搜索结果"""
    row_item = {}
    primary_key = hit.row[0]
    row_item["image_id"] = primary_key[0][1]
    attribute_columns = hit.row[1]
    for col in attribute_columns:
        key = col[0]
        val = col[1]
        row_item[key] = val
    return row_item


def main():
    # 配置参数
    table_name = "multi_modal_retrieval"
    index_name = "index"

    print("=" * 60)
    print("Tablestore 多模态语义检索示例")
    print("=" * 60)

    # 创建客户端
    client = get_client()
    print("Tablestore 客户端创建成功")

    # 场景1: 仅使用自然语言描述进行语义检索
    # 使用完整的自然语言句子,而不是简单的关键词
    search_by_text_only(
        client, table_name, index_name,
        "一只毛茸茸的小狗在草地上奔跑",
        top_k=5
    )

    # 场景2: 自然语言描述 + 城市过滤
    search_with_city_filter(
        client, table_name, index_name,
        "湖边有一棵柳树,远处是连绵的山脉",
        city="hangzhou",
        top_k=5
    )

    # 场景3: 自然语言描述 + 尺寸过滤
    # 查找高分辨率的横向图片
    search_with_size_filter(
        client, table_name, index_name,
        "夜晚灯火通明的现代化城市天际线",
        height_range=(500, 1024),
        width_range=(800, 1024),
        top_k=5
    )

    # 场景4: 自然语言描述 + 组合过滤条件
    search_with_combined_filters(
        client, table_name, index_name,
        "远处是白雪覆盖的山峰,阳光洒在雪地上闪闪发光",
        cities=["hangzhou", "shanghai", "beijing"],
        height_range=(0, 1024),
        width_range=(0, 1024),
        top_k=5
    )

    print("\n" + "=" * 60)
    print("所有检索场景演示完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

以图搜图

# -*- coding: utf-8 -*-
"""
以图搜图示例
使用本地图片进行向量化,然后在 Tablestore 中检索相似图片
"""

import base64
import os
from pathlib import Path

import dashscope
import tablestore


def get_client():
    """创建 Tablestore 客户端"""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def image_to_embedding(image_path: str) -> list[float]:
    """
    将本地图片转换为向量
    """
    # 读取图片并转换为 base64
    with open(image_path, "rb") as f:
        image_data = f.read()
    base64_image = base64.b64encode(image_data).decode("utf-8")

    # 根据文件后缀确定 MIME 类型
    suffix = Path(image_path).suffix.lower()
    if suffix in [".jpg", ".jpeg"]:
        mime_type = "image/jpeg"
    elif suffix == ".png":
        mime_type = "image/png"
    elif suffix == ".gif":
        mime_type = "image/gif"
    elif suffix == ".webp":
        mime_type = "image/webp"
    else:
        mime_type = "image/jpeg"  # 默认使用 jpeg

    # 构造 data URI
    data_uri = f"data:{mime_type};base64,{base64_image}"

    # 调用多模态向量化 API
    resp = dashscope.MultiModalEmbedding.call(
        model="multimodal-embedding-v1",
        input=[{"image": data_uri, "factor": 1.0}]
    )

    if resp.status_code == 200:
        return resp.output["embeddings"][0]["embedding"]
    else:
        raise Exception(f"图片向量化失败: {resp.code} - {resp.message}")


def search_by_image(client, table_name, index_name, image_path: str, top_k: int = 10):
    """
    以图搜图: 使用本地图片进行语义检索
    """
    print(f"\n{'='*60}")
    print(f"以图搜图")
    print(f"查询图片: {image_path}")
    print(f"返回数量: {top_k}")
    print("="*60)

    # 图片向量化
    print("正在对查询图片进行向量化...")
    query_vector = image_to_embedding(image_path)
    print(f"向量化完成,维度: {len(query_vector)}")

    # 构建向量查询
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
    )

    # 按分数排序
    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(
        query,
        limit=top_k,
        get_total_count=False,
        sort=sort
    )

    # 执行搜索
    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\n检索结果:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_by_image_with_filter(client, table_name, index_name, image_path: str,
                                 cities: list = None, height_range: tuple = None,
                                 width_range: tuple = None, top_k: int = 10):
    """
    以图搜图 + 过滤条件: 使用本地图片进行语义检索,同时应用过滤条件
    """
    print(f"\n{'='*60}")
    print(f"以图搜图 + 过滤条件")
    print(f"查询图片: {image_path}")
    print(f"城市列表: {cities}")
    print(f"高度范围: {height_range}")
    print(f"宽度范围: {width_range}")
    print(f"返回数量: {top_k}")
    print("="*60)

    # 图片向量化
    print("正在对查询图片进行向量化...")
    query_vector = image_to_embedding(image_path)
    print(f"向量化完成,维度: {len(query_vector)}")

    # 构建过滤条件
    must_queries = []

    if cities and len(cities) > 0:
        must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))

    if height_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='height',
            range_from=height_range[0],
            range_to=height_range[1],
            include_lower=True,
            include_upper=True
        ))

    if width_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='width',
            range_from=width_range[0],
            range_to=width_range[1],
            include_lower=True,
            include_upper=True
        ))

    vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None

    # 构建向量查询
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=vector_filter
    )

    # 按分数排序
    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    # 执行搜索
    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\n检索结果:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def parse_search_hit(hit):
    """解析搜索结果"""
    row_item = {}
    primary_key = hit.row[0]
    row_item["image_id"] = primary_key[0][1]
    attribute_columns = hit.row[1]
    for col in attribute_columns:
        key = col[0]
        val = col[1]
        row_item[key] = val
    return row_item


def main():
    # 配置参数
    table_name = "multi_modal_retrieval"
    index_name = "index"

    print("=" * 60)
    print("Tablestore 以图搜图示例")
    print("=" * 60)

    # 创建客户端
    client = get_client()
    print("Tablestore 客户端创建成功")

    # 获取项目根目录
    current_dir = Path(__file__).parent
    data_dir = current_dir / "data" / "photograph"

    # 获取一张示例图片作为查询图片
    sample_images = list(data_dir.glob("*.jpg"))
    if not sample_images:
        print("错误: 未找到示例图片,请确保 data/photograph 目录下有 jpg 图片")
        return

    # 使用第一张图片作为查询示例
    query_image_path = str(sample_images[0])
    print(f"\n使用示例图片: {query_image_path}")

    # 场景1: 仅使用图片进行以图搜图
    search_by_image(client, table_name, index_name, query_image_path, top_k=5)

    # 场景2: 以图搜图 + 过滤条件
    # 只搜索特定城市的相似图片
    search_by_image_with_filter(
        client, table_name, index_name,
        query_image_path,
        cities=["hangzhou", "shanghai"],
        top_k=5
    )

    # 场景3: 以图搜图 + 尺寸过滤
    # 只搜索横向的相似图片(宽度大于高度)
    search_by_image_with_filter(
        client, table_name, index_name,
        query_image_path,
        width_range=(800, 1024),
        top_k=5
    )

    print("\n" + "=" * 60)
    print("以图搜图演示完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

可视化检索界面

构建基于 Gradio 的交互式检索界面,提供直观的图形化操作体验。此界面依赖演示项目中的本地图片目录,适用于快速体验和演示。使用自定义数据时,可参考代码实现相应的界面功能。

  1. 安装 Gradio 相关依赖。

    pip install gradio gradio_rangeslider
  2. 启动可视化界面。

    python src/gradio_app.py

    启动成功后,访问应用地址(如 http://localhost:7860)进入检索界面。

    功能

    说明

    以图搜图

    上传本地图片,查询相似的图片。

    自然语言搜索

    输入自然语言描述,如“远处是白雪覆盖的山峰”、“一只毛茸茸的小狗在草地上奔跑”等。

    Top K

    设置返回结果数量(1-30)。

    高度/宽度范围

    按图片尺寸进行筛选。

    城市过滤

    按城市过滤(支持多选)。

相关文档