基于 Tablestore 向量检索能力与阿里云百炼多模态 Embedding 模型,构建多模态图片检索系统。系统支持自然语言搜图和以图搜图功能,适用于电商商品搜索、智能相册管理、媒体资产检索等场景。
方案概览
多模态图片检索系统构建流程包括以下核心步骤:
创建表和索引:创建Tablestore数据表存储图片数据,创建多元索引支持向量检索功能。
图片向量化处理:使用百炼多模态 Embedding 模型将图片转换为高维向量表示。
向量数据写入:将生成的图片向量数据及相关元数据批量存储至 Tablestore。
执行多模态检索:将查询图片或自然语言转换为向量,在多元索引中执行相似度搜索,支持通过元数据条件进行精准过滤。

准备工作
开始构建检索系统前,需要完成开发环境配置、凭证设置和数据准备。
1. 安装 SDK
确保已安装 Python 3.12 及以上版本。
执行以下命令安装 Tablestore Python SDK 和阿里云百炼 SDK。
pip install tablestore pip install dashscope pip install Pillow
2. 配置环境变量
将访问凭证配置为环境变量,确保代码安全性与跨环境可移植性。
配置前请先获取百炼平台的API Key、AccessKey,前往表格存储控制台创建实例并获取实例名称和访问地址。
出于安全考虑,新创建的表格存储实例默认不开启公网访问,如需使用公网访问地址,请在实例的网络管理中设置允许公网访问。
export DASHSCOPE_API_KEY=<百炼平台的API KEY>
export tablestore_end_point=<Tablestore实例访问地址>
export tablestore_instance_name=<Tablestore实例名称>
export tablestore_access_key_id=<AccessKey ID>
export tablestore_access_key_secret=<AccessKey Secret>3. 准备图片数据
支持使用自定义图片数据或教程提供的演示数据集。
git clone https://github.com/aliyun/alibabacloud-tablestore-ai-demo.git也可直接下载演示项目文件:alibabacloud-tablestore-ai-demo-main
步骤一:创建表和索引
创建存储图片向量数据的数据表和支持向量检索的多元索引。根据业务需求和数据特点自定义表结构和索引配置。如需快速体验演示效果,可直接使用以下示例配置。
1. 创建数据表
# -*- coding: utf-8 -*-
"""
创建 Tablestore 数据表
"""
import os
import tablestore
def main():
# 初始化 Tablestore 客户端
client = tablestore.OTSClient(
os.getenv("tablestore_end_point"),
os.getenv("tablestore_access_key_id"),
os.getenv("tablestore_access_key_secret"),
os.getenv("tablestore_instance_name"),
retry_policy=tablestore.WriteRetryPolicy(),
)
# 创建数据表,定义主键
table_name = "multi_modal_retrieval"
table_meta = tablestore.TableMeta(table_name, [("image_id", "STRING")])
table_options = tablestore.TableOptions()
reserved_throughput = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
try:
client.create_table(table_meta, table_options, reserved_throughput)
print(f"数据表 '{table_name}' 创建成功")
except tablestore.OTSServiceError as e:
if "OTSObjectAlreadyExist" in str(e):
print(f"数据表 '{table_name}' 已存在")
else:
raise
if __name__ == "__main__":
main()
2. 创建多元索引
向量数据在 Tablestore 数据表中以字符串格式存储。要启用向量检索功能,必须创建多元索引并配置向量字段类型,以支持高维向量的相似度计算和快速检索。
# -*- coding: utf-8 -*-
"""
创建 Tablestore 多元索引(含向量字段)
"""
import os
import tablestore
def main():
# 初始化 Tablestore 客户端
client = tablestore.OTSClient(
os.getenv("tablestore_end_point"),
os.getenv("tablestore_access_key_id"),
os.getenv("tablestore_access_key_secret"),
os.getenv("tablestore_instance_name"),
retry_policy=tablestore.WriteRetryPolicy(),
)
table_name = "multi_modal_retrieval"
index_name = "index"
# 定义索引字段
field_schemas = [
tablestore.FieldSchema("image_id", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("city", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("height", tablestore.FieldType.LONG, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("width", tablestore.FieldType.LONG, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema(
"vector",
tablestore.FieldType.VECTOR,
vector_options=tablestore.VectorOptions(
data_type=tablestore.VectorDataType.VD_FLOAT_32,
dimension=1024,
metric_type=tablestore.VectorMetricType.VM_COSINE,
),
),
]
try:
index_meta = tablestore.SearchIndexMeta(field_schemas)
client.create_search_index(table_name, index_name, index_meta)
print(f"多元索引 '{index_name}' 创建成功")
except tablestore.OTSServiceError as e:
if "OTSObjectAlreadyExist" in str(e):
print(f"多元索引 '{index_name}' 已存在")
else:
raise
if __name__ == "__main__":
main()
步骤二:图片向量化处理
调用阿里云百炼多模态向量化模型对图片进行向量化处理。以下示例演示本地图片向量化方法,更多使用方式请参见多模态向量。
大量图片向量化处理耗时较长,演示项目提供预处理的向量数据文件data.json,可在步骤三中直接使用。# -*- coding: utf-8 -*-
"""
本地图片向量化演示
展示如何使用百炼多模态向量化模型对本地图片进行向量化
输出原始图片信息、向量维度、向量的前几个元素等关键信息
"""
import base64
import os
from pathlib import Path
import dashscope
from PIL import Image
def image_to_base64(image_path):
"""将图片文件转换为 base64 编码"""
with open(image_path, "rb") as f:
image_data = f.read()
return base64.b64encode(image_data).decode("utf-8")
def get_image_embedding(image_path):
"""
调用百炼多模态向量化模型,以本地图片方式进行向量化
"""
# 将本地图片转换为 base64
base64_image = image_to_base64(image_path)
# 获取图片格式
suffix = Path(image_path).suffix.lower()
if suffix in [".jpg", ".jpeg"]:
mime_type = "image/jpeg"
elif suffix == ".png":
mime_type = "image/png"
elif suffix == ".gif":
mime_type = "image/gif"
elif suffix == ".webp":
mime_type = "image/webp"
else:
mime_type = "image/jpeg" # 默认使用 jpeg
# 构造 data URI
data_uri = f"data:{mime_type};base64,{base64_image}"
# 调用多模态向量化 API
resp = dashscope.MultiModalEmbedding.call(
model="multimodal-embedding-v1",
input=[{"image": data_uri, "factor": 1.0}]
)
if resp.status_code == 200:
return resp.output["embeddings"][0]["embedding"]
else:
raise Exception(f"向量化失败: {resp.code} - {resp.message}")
def get_image_info(image_path):
"""获取图片基本信息"""
with Image.open(image_path) as img:
return {
"filename": os.path.basename(image_path),
"format": img.format,
"mode": img.mode,
"width": img.width,
"height": img.height,
"size_bytes": os.path.getsize(image_path),
}
def main():
# 路径配置
current_dir = Path(__file__).parent
project_root = current_dir
image_dir = project_root / "data" / "photograph"
print("=" * 60)
print("本地图片向量化演示")
print("=" * 60)
# 获取图片列表
image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp'))]
if not image_files:
print("未找到图片文件")
return
# 选择第一张图片进行演示
demo_image = image_files[0]
image_path = image_dir / demo_image
print(f"\n[1/3] 读取图片信息")
print("-" * 60)
# 获取图片信息
image_info = get_image_info(image_path)
print(f"文件名: {image_info['filename']}")
print(f"格式: {image_info['format']}")
print(f"模式: {image_info['mode']}")
print(f"宽度: {image_info['width']} px")
print(f"高度: {image_info['height']} px")
print(f"文件大小: {image_info['size_bytes']:,} bytes")
print(f"\n[2/3] 调用向量化 API")
print("-" * 60)
print("正在调用百炼多模态向量化模型...")
# 向量化
vector = get_image_embedding(str(image_path))
print(f"\n[3/3] 向量化结果")
print("-" * 60)
print(f"向量维度: {len(vector)}")
print(f"向量类型: {type(vector[0]).__name__}")
print(f"向量前10个元素:")
for i, v in enumerate(vector[:10]):
print(f" [{i}] {v:.8f}")
print(" ...")
print(f"向量后5个元素:")
for i, v in enumerate(vector[-5:], start=len(vector)-5):
print(f" [{i}] {v:.8f}")
# 计算向量范数
import math
norm = math.sqrt(sum(v * v for v in vector))
print(f"\n向量L2范数: {norm:.8f}")
print("\n" + "=" * 60)
print("向量化演示完成!")
print("=" * 60)
if __name__ == "__main__":
main()
步骤三:向量数据写入
批量导入图片向量数据至 Tablestore 数据表。以下示例直接读取演示项目中预处理的向量数据进行批量写入。如使用自定义业务数据,可将图片向量化处理与数据写入操作结合执行。
# -*- coding: utf-8 -*-
"""
批量写入图片数据到 Tablestore
"""
import json
import os
from pathlib import Path
import tablestore
def main():
# 初始化 Tablestore 客户端
client = tablestore.OTSClient(
os.getenv("tablestore_end_point"),
os.getenv("tablestore_access_key_id"),
os.getenv("tablestore_access_key_secret"),
os.getenv("tablestore_instance_name"),
retry_policy=tablestore.WriteRetryPolicy(),
)
table_name = "multi_modal_retrieval"
batch_size = 100
# 从 JSON 文件加载数据
data_path = Path(__file__).parent / "data" / "data.json"
with open(data_path, "r", encoding="utf-8") as f:
data_array = json.load(f)
print(f"已加载 {len(data_array)} 条记录")
# 批量写入 Tablestore
put_row_items = []
success_count = 0
for idx, item in enumerate(data_array):
primary_key = [("image_id", item["image_id"])]
attribute_columns = [
("city", item.get("city", "unknown")),
("vector", json.dumps(item["vector"])),
("width", item.get("width", 0)),
("height", item.get("height", 0)),
]
row = tablestore.Row(primary_key, attribute_columns)
condition = tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)
put_row_items.append(tablestore.PutRowItem(row, condition))
# 批量写入
if len(put_row_items) >= batch_size or idx == len(data_array) - 1:
request = tablestore.BatchWriteRowRequest()
request.add(tablestore.TableInBatchWriteRowItem(table_name, put_row_items))
result = client.batch_write_row(request)
if result.is_all_succeed():
success_count += len(put_row_items)
print(f"进度: {idx + 1}/{len(data_array)} - 写入 {len(put_row_items)} 行成功")
put_row_items = []
print(f"完成: 成功写入 {success_count} 行")
if __name__ == "__main__":
main()
步骤四:执行多模态检索
多模态图片检索系统支持两种检索模式:自然语言搜图和以图搜图。系统将查询内容转换为向量表示,在向量索引中执行相似度计算,返回语义最匹配的图片结果,同时支持结合元数据条件(如城市、图片尺寸等)进行精准过滤。
自然语言检索
# -*- coding: utf-8 -*-
"""
语义检索示例
包含多种查询场景:
1. 仅使用查询文本进行语义检索
2. 使用查询文本 + 过滤条件(城市、高度、宽度)
"""
import os
import dashscope
import tablestore
from dashscope import MultiModalEmbeddingItemText
def get_client():
"""创建 Tablestore 客户端"""
endpoint = os.getenv("tablestore_end_point")
instance_name = os.getenv("tablestore_instance_name")
access_key_id = os.getenv("tablestore_access_key_id")
access_key_secret = os.getenv("tablestore_access_key_secret")
client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
return client
def text_to_embedding(text: str) -> list[float]:
"""将文本转换为向量"""
resp = dashscope.MultiModalEmbedding.call(
model="multimodal-embedding-v1",
input=[MultiModalEmbeddingItemText(text=text, factor=1.0)]
)
if resp.status_code == 200:
return resp.output["embeddings"][0]["embedding"]
else:
raise Exception(f"文本向量化失败: {resp.code} - {resp.message}")
def search_by_text_only(client, table_name, index_name, query_text: str, top_k: int = 10):
"""
场景1: 仅使用查询文本进行语义检索
"""
print(f"\n{'='*60}")
print(f"场景1: 仅使用查询文本检索")
print(f"查询文本: '{query_text}'")
print(f"返回数量: {top_k}")
print("="*60)
# 文本向量化
query_vector = text_to_embedding(query_text)
# 构建向量查询
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
)
# 按分数排序
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(
query,
limit=top_k,
get_total_count=False,
sort=sort
)
# 执行搜索
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\n检索结果:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_with_city_filter(client, table_name, index_name, query_text: str, city: str, top_k: int = 10):
"""
场景2: 使用查询文本 + 城市过滤条件
"""
print(f"\n{'='*60}")
print(f"场景2: 查询文本 + 城市过滤")
print(f"查询文本: '{query_text}'")
print(f"城市过滤: {city}")
print(f"返回数量: {top_k}")
print("="*60)
query_vector = text_to_embedding(query_text)
# 构建带城市过滤的向量查询
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=tablestore.TermQuery(field_name='city', column_value=city)
)
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\n检索结果:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_with_size_filter(client, table_name, index_name, query_text: str,
height_range: tuple = None, width_range: tuple = None, top_k: int = 10):
"""
场景3: 使用查询文本 + 尺寸过滤条件(高度、宽度)
"""
print(f"\n{'='*60}")
print(f"场景3: 查询文本 + 尺寸过滤")
print(f"查询文本: '{query_text}'")
print(f"高度范围: {height_range}")
print(f"宽度范围: {width_range}")
print(f"返回数量: {top_k}")
print("="*60)
query_vector = text_to_embedding(query_text)
# 构建过滤条件
must_queries = []
if height_range:
must_queries.append(tablestore.RangeQuery(
field_name='height',
range_from=height_range[0],
range_to=height_range[1],
include_lower=True,
include_upper=True
))
if width_range:
must_queries.append(tablestore.RangeQuery(
field_name='width',
range_from=width_range[0],
range_to=width_range[1],
include_lower=True,
include_upper=True
))
vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=vector_filter
)
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\n检索结果:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_with_combined_filters(client, table_name, index_name, query_text: str,
cities: list = None, height_range: tuple = None,
width_range: tuple = None, top_k: int = 10):
"""
场景4: 使用查询文本 + 组合过滤条件(城市列表、高度、宽度)
"""
print(f"\n{'='*60}")
print(f"场景4: 查询文本 + 组合过滤条件")
print(f"查询文本: '{query_text}'")
print(f"城市列表: {cities}")
print(f"高度范围: {height_range}")
print(f"宽度范围: {width_range}")
print(f"返回数量: {top_k}")
print("="*60)
query_vector = text_to_embedding(query_text)
# 构建组合过滤条件
must_queries = []
if cities and len(cities) > 0:
must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))
if height_range:
must_queries.append(tablestore.RangeQuery(
field_name='height',
range_from=height_range[0],
range_to=height_range[1],
include_lower=True,
include_upper=True
))
if width_range:
must_queries.append(tablestore.RangeQuery(
field_name='width',
range_from=width_range[0],
range_to=width_range[1],
include_lower=True,
include_upper=True
))
vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=vector_filter
)
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\n检索结果:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def parse_search_hit(hit):
"""解析搜索结果"""
row_item = {}
primary_key = hit.row[0]
row_item["image_id"] = primary_key[0][1]
attribute_columns = hit.row[1]
for col in attribute_columns:
key = col[0]
val = col[1]
row_item[key] = val
return row_item
def main():
# 配置参数
table_name = "multi_modal_retrieval"
index_name = "index"
print("=" * 60)
print("Tablestore 多模态语义检索示例")
print("=" * 60)
# 创建客户端
client = get_client()
print("Tablestore 客户端创建成功")
# 场景1: 仅使用自然语言描述进行语义检索
# 使用完整的自然语言句子,而不是简单的关键词
search_by_text_only(
client, table_name, index_name,
"一只毛茸茸的小狗在草地上奔跑",
top_k=5
)
# 场景2: 自然语言描述 + 城市过滤
search_with_city_filter(
client, table_name, index_name,
"湖边有一棵柳树,远处是连绵的山脉",
city="hangzhou",
top_k=5
)
# 场景3: 自然语言描述 + 尺寸过滤
# 查找高分辨率的横向图片
search_with_size_filter(
client, table_name, index_name,
"夜晚灯火通明的现代化城市天际线",
height_range=(500, 1024),
width_range=(800, 1024),
top_k=5
)
# 场景4: 自然语言描述 + 组合过滤条件
search_with_combined_filters(
client, table_name, index_name,
"远处是白雪覆盖的山峰,阳光洒在雪地上闪闪发光",
cities=["hangzhou", "shanghai", "beijing"],
height_range=(0, 1024),
width_range=(0, 1024),
top_k=5
)
print("\n" + "=" * 60)
print("所有检索场景演示完成!")
print("=" * 60)
if __name__ == "__main__":
main()
以图搜图
# -*- coding: utf-8 -*-
"""
以图搜图示例
使用本地图片进行向量化,然后在 Tablestore 中检索相似图片
"""
import base64
import os
from pathlib import Path
import dashscope
import tablestore
def get_client():
"""创建 Tablestore 客户端"""
endpoint = os.getenv("tablestore_end_point")
instance_name = os.getenv("tablestore_instance_name")
access_key_id = os.getenv("tablestore_access_key_id")
access_key_secret = os.getenv("tablestore_access_key_secret")
client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
return client
def image_to_embedding(image_path: str) -> list[float]:
"""
将本地图片转换为向量
"""
# 读取图片并转换为 base64
with open(image_path, "rb") as f:
image_data = f.read()
base64_image = base64.b64encode(image_data).decode("utf-8")
# 根据文件后缀确定 MIME 类型
suffix = Path(image_path).suffix.lower()
if suffix in [".jpg", ".jpeg"]:
mime_type = "image/jpeg"
elif suffix == ".png":
mime_type = "image/png"
elif suffix == ".gif":
mime_type = "image/gif"
elif suffix == ".webp":
mime_type = "image/webp"
else:
mime_type = "image/jpeg" # 默认使用 jpeg
# 构造 data URI
data_uri = f"data:{mime_type};base64,{base64_image}"
# 调用多模态向量化 API
resp = dashscope.MultiModalEmbedding.call(
model="multimodal-embedding-v1",
input=[{"image": data_uri, "factor": 1.0}]
)
if resp.status_code == 200:
return resp.output["embeddings"][0]["embedding"]
else:
raise Exception(f"图片向量化失败: {resp.code} - {resp.message}")
def search_by_image(client, table_name, index_name, image_path: str, top_k: int = 10):
"""
以图搜图: 使用本地图片进行语义检索
"""
print(f"\n{'='*60}")
print(f"以图搜图")
print(f"查询图片: {image_path}")
print(f"返回数量: {top_k}")
print("="*60)
# 图片向量化
print("正在对查询图片进行向量化...")
query_vector = image_to_embedding(image_path)
print(f"向量化完成,维度: {len(query_vector)}")
# 构建向量查询
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
)
# 按分数排序
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(
query,
limit=top_k,
get_total_count=False,
sort=sort
)
# 执行搜索
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\n检索结果:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_by_image_with_filter(client, table_name, index_name, image_path: str,
cities: list = None, height_range: tuple = None,
width_range: tuple = None, top_k: int = 10):
"""
以图搜图 + 过滤条件: 使用本地图片进行语义检索,同时应用过滤条件
"""
print(f"\n{'='*60}")
print(f"以图搜图 + 过滤条件")
print(f"查询图片: {image_path}")
print(f"城市列表: {cities}")
print(f"高度范围: {height_range}")
print(f"宽度范围: {width_range}")
print(f"返回数量: {top_k}")
print("="*60)
# 图片向量化
print("正在对查询图片进行向量化...")
query_vector = image_to_embedding(image_path)
print(f"向量化完成,维度: {len(query_vector)}")
# 构建过滤条件
must_queries = []
if cities and len(cities) > 0:
must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))
if height_range:
must_queries.append(tablestore.RangeQuery(
field_name='height',
range_from=height_range[0],
range_to=height_range[1],
include_lower=True,
include_upper=True
))
if width_range:
must_queries.append(tablestore.RangeQuery(
field_name='width',
range_from=width_range[0],
range_to=width_range[1],
include_lower=True,
include_upper=True
))
vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None
# 构建向量查询
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=vector_filter
)
# 按分数排序
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
# 执行搜索
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\n检索结果:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. 得分: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def parse_search_hit(hit):
"""解析搜索结果"""
row_item = {}
primary_key = hit.row[0]
row_item["image_id"] = primary_key[0][1]
attribute_columns = hit.row[1]
for col in attribute_columns:
key = col[0]
val = col[1]
row_item[key] = val
return row_item
def main():
# 配置参数
table_name = "multi_modal_retrieval"
index_name = "index"
print("=" * 60)
print("Tablestore 以图搜图示例")
print("=" * 60)
# 创建客户端
client = get_client()
print("Tablestore 客户端创建成功")
# 获取项目根目录
current_dir = Path(__file__).parent
data_dir = current_dir / "data" / "photograph"
# 获取一张示例图片作为查询图片
sample_images = list(data_dir.glob("*.jpg"))
if not sample_images:
print("错误: 未找到示例图片,请确保 data/photograph 目录下有 jpg 图片")
return
# 使用第一张图片作为查询示例
query_image_path = str(sample_images[0])
print(f"\n使用示例图片: {query_image_path}")
# 场景1: 仅使用图片进行以图搜图
search_by_image(client, table_name, index_name, query_image_path, top_k=5)
# 场景2: 以图搜图 + 过滤条件
# 只搜索特定城市的相似图片
search_by_image_with_filter(
client, table_name, index_name,
query_image_path,
cities=["hangzhou", "shanghai"],
top_k=5
)
# 场景3: 以图搜图 + 尺寸过滤
# 只搜索横向的相似图片(宽度大于高度)
search_by_image_with_filter(
client, table_name, index_name,
query_image_path,
width_range=(800, 1024),
top_k=5
)
print("\n" + "=" * 60)
print("以图搜图演示完成!")
print("=" * 60)
if __name__ == "__main__":
main()
可视化检索界面
构建基于 Gradio 的交互式检索界面,提供直观的图形化操作体验。此界面依赖演示项目中的本地图片目录,适用于快速体验和演示。使用自定义数据时,可参考代码实现相应的界面功能。
安装 Gradio 相关依赖。
pip install gradio gradio_rangeslider启动可视化界面。
python src/gradio_app.py启动成功后,访问应用地址(如
http://localhost:7860)进入检索界面。功能
说明
以图搜图
上传本地图片,查询相似的图片。
自然语言搜索
输入自然语言描述,如“远处是白雪覆盖的山峰”、“一只毛茸茸的小狗在草地上奔跑”等。
Top K
设置返回结果数量(1-30)。
高度/宽度范围
按图片尺寸进行筛选。
城市过滤
按城市过滤(支持多选)。