通过阿里云Milvus与通义千问实现多模态搜索

更新时间:

本文通过代码示例展示了如何结合阿里云向量检索服务Milvus与通义千问VL大模型,以提取图片特征,并利用多模态Embedding模型实现高效的多模态搜索,涵盖了以文搜图、以文搜文、以图搜图以及以图搜文等多种检索方式。

背景信息

在多模态搜索场景中,图片和文本的非结构化数据需要被转换为向量表示,然后通过向量检索技术快速找到相似的内容。本实践使用以下工具:

  • 向量检索服务Milvus:高效的向量数据库,用于存储和检索向量。

  • 通义千问VL:提取图片描述和关键词。更多详情请参见通义千问VL

  • DashScope Embedding API:将图片和文本转换为向量。更多详情请参见Multimodal-Embedding API详情

其功能包括:

  • 以文搜图:输入文本查询,搜索最相似的图片。

  • 以文搜文:输入文本查询,搜索最相似的图片描述。

  • 以图搜图:输入图片查询,搜索最相似的图片。

  • 以图搜文:输入图片查询,搜索最相似的图片描述。

系统架构

下图展示了本文中使用的多模态搜索系统的整体架构。

image

前提条件

  • 已创建Milvus实例。具体操作,请参见快速创建Milvus实例

  • 已开通百炼服务并获得API-KEY。具体操作,请参见获取API Key

  • 已安装所需的依赖包。

    pip3 install dashscope pymilvus==2.5.0

    本文示例基于 Python 3.9 环境运行。

  • 已下载并解压缩示例数据集。

    wget https://github.com/milvus-io/pymilvus-assets/releases/download/imagedata/reverse_image_search.zip
    unzip -q -o reverse_image_search.zip

    示例数据集包含一个CSV文件reverse_image_search.csv和若干图片文件。

    说明

    本文使用的示例数据集及其图片来源于开源项目Milvus

核心代码介绍

在本文示例中,首先利用通义千问VL模型提取图片描述信息,并将其存储在image_description字段中。接着,通过多模态Embedding模型,将图片及其描述分别转换为对应的向量表示(image_embeddingtext_embedding),以便后续进行跨模态检索或分析。

为了简化演示,本示例仅从前200张图片中提取数据并完成上述流程。

import base64
import csv
import dashscope
import os
import pandas as pd
import sys
import time
from tqdm import tqdm
from pymilvus import (
    connections,
    FieldSchema,
    CollectionSchema,
    DataType,
    Collection,
    MilvusException,
    utility,
)

from http import HTTPStatus
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class FeatureExtractor:
    def __init__(self, DASHSCOPE_API_KEY):
        self._api_key = DASHSCOPE_API_KEY  # 使用环境变量存储API密钥

    def __call__(self, input_data, input_type):
        if input_type not in ("image", "text"):
            raise ValueError("Invalid input type. Must be 'image' or 'text'.")

        try:
            if input_type == "image":
                _, ext = os.path.splitext(input_data)
                image_format = ext.lstrip(".").lower()
                with open(input_data, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                input_data = f"data:image/{image_format};base64,{base64_image}"
                payload = [{"image": input_data}]
            else:
                payload = [{"text": input_data}]

            resp = dashscope.MultiModalEmbedding.call(
                model="multimodal-embedding-v1",
                input=payload,
                api_key=self._api_key,
            )

            if resp.status_code == HTTPStatus.OK:
                return resp.output["embeddings"][0]["embedding"]
            else:
                raise RuntimeError(
                    f"API调用失败,状态码: {resp.status_code}, 错误信息: {resp.message}"
                )
        except Exception as e:
            logger.error(f"处理失败: {str(e)}")
            raise


class FeatureExtractorVL:
    def __init__(self, DASHSCOPE_API_KEY):
        self._api_key = DASHSCOPE_API_KEY  # 使用环境变量存储API密钥

    def __call__(self, input_data, input_type):
        if input_type not in ("image"):
            raise ValueError("Invalid input type. Must be 'image'.")

        try:
            if input_type == "image":
                payload=[
                            {
                                "role": "system",
                                "content": [{"type":"text","text": "You are a helpful assistant."}]
                            },
                            {
                                "role": "user",
                                "content": [
                                            # {"image": "https://dashscope.oss-cn-beijing.aliyuncs.com/images/dog_and_girl.jpeg"},
                                            {"image": input_data},
                                            {"text": "先用50字内的文字描述这张图片,然后再给出5个关键词"}
                                            ],
                            }
                        ]

            resp = dashscope.MultiModalConversation.call(
                model="qwen-vl-plus",
                messages=payload,
                api_key=self._api_key,
            )

            if resp.status_code == HTTPStatus.OK:
                return resp.output["choices"][0]["message"].content[0]["text"]
            else:
                raise RuntimeError(
                    f"API调用失败,状态码: {resp.status_code}, 错误信息: {resp.message}"
                )
        except Exception as e:
            logger.error(f"处理失败: {str(e)}")
            raise


class MilvusClient:
    def __init__(self, MILVUS_TOKEN, MILVUS_HOST, MILVUS_PORT, INDEX, COLLECTION_NAME):
        self._token = MILVUS_TOKEN
        self._host = MILVUS_HOST
        self._port = MILVUS_PORT
        self._index = INDEX
        self._collection_name = COLLECTION_NAME

        self._connect()
        self._create_collection_if_not_exists()

    def _connect(self):
        try:
            connections.connect(alias="default", host=self._host, port=self._port, token=self._token)
            logger.info("Connected to Milvus successfully.")
        except Exception as e:
            logger.error(f"连接Milvus失败: {str(e)}")
            sys.exit(1)

    def _collection_exists(self):
        return self._collection_name in utility.list_collections()
    
    def _create_collection_if_not_exists(self):
        try:
            if not self._collection_exists():
                fields = [
                    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
                    FieldSchema(name="origin", dtype=DataType.VARCHAR, max_length=512),
                    FieldSchema(name="image_description", dtype=DataType.VARCHAR, max_length=1024),
                    FieldSchema(name="image_embedding", dtype=DataType.FLOAT_VECTOR, dim=1024),
                    FieldSchema(name="text_embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
                ]

                schema = CollectionSchema(fields)

                self._collection = Collection(self._collection_name, schema)

                if self._index == 'IVF_FLAT':
                    self._create_ivf_index()
                else:
                    self._create_hnsw_index()   
                logger.info("Collection created successfully.")
            else:
                self._collection = Collection(self._collection_name)
                logger.info("Collection already exists.")
        except Exception as e:
            logger.error(f"创建或加载集合失败: {str(e)}")
            sys.exit(1)


    def _create_ivf_index(self):
        index_params = {
            "index_type": "IVF_FLAT",
            "params": {
                        "nlist": 1024, # Number of clusters for the index
                    },
            "metric_type": "L2",
        }
        self._collection.create_index("image_embedding", index_params)
        self._collection.create_index("text_embedding", index_params)
        logger.info("Index created successfully.")

    def _create_hnsw_index(self):
        index_params = {
            "index_type": "HNSW",
            "params": {
                        "M": 64, # Maximum number of neighbors each node can connect to in the graph
                        "efConstruction": 100, # Number of candidate neighbors considered for connection during index construction
                    },
            "metric_type": "L2",
        }
        self._collection.create_index("image_embedding", index_params)
        self._collection.create_index("text_embedding", index_params)
        logger.info("Index created successfully.")
    
    def insert(self, data):
        try:
            self._collection.insert(data)
            self._collection.load()
            logger.info("数据插入并加载成功.")
        except MilvusException as e:
            logger.error(f"插入数据失败: {str(e)}")
            raise

    def search(self, query_embedding, field, limit=3):
        try:
            if self._index == 'IVF_FLAT':
                param={"metric_type": "L2", "params": {"nprobe": 10}}
            else:
                param={"metric_type": "L2", "params": {"ef": 10}}

            result = self._collection.search(
                data=[query_embedding],
                anns_field=field,
                param=param,
                limit=limit,
                output_fields=["origin", "image_description"],
            )
            return [{"id": hit.id, "distance": hit.distance, "origin": hit.origin, "image_description": hit.image_description} for hit in result[0]]
        except Exception as e:
            logger.error(f"搜索失败: {str(e)}")
            return None


#  数据加载与嵌入生成
def load_image_embeddings(extractor, extractorVL, csv_path):
    df = pd.read_csv(csv_path)
    image_embeddings = {}

    for image_path in tqdm(df["path"].tolist()[:200], desc="生成图像embedding"): # 仅用前200张图进行演示
        try:
            desc = extractorVL(image_path, "image")
            image_embeddings[image_path] = [desc, extractor(image_path, "image"), extractor(desc, "text")]
            time.sleep(1)  # 控制API调用频率
        except Exception as e:
            logger.warning(f"处理{image_path}失败,已跳过: {str(e)}")

    return [{"origin": k, 'image_description':v[0], "image_embedding": v[1], 'text_embedding': v[2]} for k, v in image_embeddings.items()]
    

其中:

  • FeatureExtractor:该类用于调用DashScope Embedding API,将图片或文本转换为向量表示。

  • FeatureExtractorVL:该类调用通义千问VL模型,提取图片的文字描述和关键词。

  • MilvusClient:该类封装了Milvus的操作,包括连接、集合创建、索引构建、数据插入和搜索。

操作流程

步骤一:加载数据集

if __name__ == "__main__":
    # 配置Milvus和DashScope API
    MILVUS_TOKEN = "root:****"
    MILVUS_HOST = "c-0aa16b1****.milvus.aliyuncs.com"
    MILVUS_PORT = "19530"
    COLLECTION_NAME = "multimodal_search"
    INDEX = "IVF_FLAT"  # IVF_FLAT OR HNSW  
    script_dir = os.path.dirname(os.path.abspath(__file__))
    csv_path = os.path.join(script_dir, "reverse_image_search.csv")



    # Step 1:初始化Milvus客户端
    milvus_client = MilvusClient(MILVUS_TOKEN, MILVUS_HOST, MILVUS_PORT, INDEX, COLLECTION_NAME)

    # Step 2:初始化千问VL大模型与多模态Embedding模型
    extractor = FeatureExtractor(DASHSCOPE_API_KEY)
    extractorVL = FeatureExtractorVL(DASHSCOPE_API_KEY)

    # Step 3:将图片数据集Embedding后插入到Milvus
    embeddings = load_image_embeddings(extractor, extractorVL, csv_path)
    milvus_client.insert(embeddings)

涉及以下参数,请您根据实际情况替换。

参数名称

说明

DASHSCOPE_API_KEY

DashScopeAPI密钥,用于调用通义千问VL和多模态Embedding模型。

MILVUS_TOKEN

Milvus实例的访问凭证,格式为username:password

MILVUS_HOST

Milvus实例的内网或者公网地址,例如c-xxxxxxxxxxxx.milvus.aliyuncs.com。您可以在Milvus实例的实例详情页面查看。

MILVUS_PORT

Milvus实例的端口号,默认为19530

COLLECTION_NAME

Milvus集合名称,用于存储图片和文本的向量数据。

执行以上Python文件,当回显信息中包含以下内容,则说明加载成功。

生成图像embedding:  100%
INFO:__main__:数据插入并加载成功

您也可以访问Attu页面,在数据页签,进一步验证加载后的数据集信息。

image

例如,通过通义千问VL大模型对图片进行分析后,提取出的文本总结生动地描绘了画面内容为“站在海滩上的人穿着牛仔裤和绿色靴子。沙滩上有水迹覆盖。关键词:海滩、脚印、沙地、鞋子、裤子”。

image

图片描述以简洁而形象的语言展现了图片的主要特征,使人能够清晰地想象出画面场景。

image

步骤二:多模态向量检索

示例 1:以文搜图与以文搜文

在本示例中,查询文本query为“棕色的狗”。首先,通过多模态向量模型将query转换为向量表示(Embedding)。随后,基于生成的向量,在image_embedding中进行以文搜图操作,同时在text_embedding中进行以文搜文操作,最终分别得到图像和文本的检索结果。

在上述的Python文件中,替换main部分信息,然后执行。

if __name__ == "__main__":
    MILVUS_HOST = "c-xxxxxxxxxxxx.milvus.aliyuncs.com"
    MILVUS_PORT = "19530"
    MILVUS_TOKEN = "root:****"
    COLLECTION_NAME = "multimodal_search"
    INDEX = "IVF_FLAT" # IVF_FLAT OR HNSW
    DASHSCOPE_API_KEY = "<YOUR_DASHSCOPE_API_KEY >"
    
    # Step1:初始化Milvus客户端
    milvus_client = MilvusClient(MILVUS_TOKEN, MILVUS_HOST, MILVUS_PORT, INDEX, COLLECTION_NAME)
    
    # Step2:初始化多模态Embedding模型
    extractor = FeatureExtractor(DASHSCOPE_API_KEY)

    # Step4:多模态搜索示例,以文搜图和以文搜文
    text_query = "棕色的狗"
    text_embedding = extractor(text_query, "text")
    text_results_1 = milvus_client.search(text_embedding, field = 'image_embedding')
    logger.info(f"以文搜图查询结果: {text_results_1}")
    text_results_2 = milvus_client.search(text_embedding, field = 'text_embedding')
    logger.info(f"以文搜文查询结果: {text_results_2}")
  

返回信息如下所示。

说明

由于大模型的输出存在一定的随机性,本示例的结果可能无法完全复现。

INFO:__main__:以文搜图查询结果: [
{'id': 456882250782308942, 'distance': 1.338853359222412, 'origin': './train/Rhodesian_ridgeback/n02087394_9675.JPEG', 'image_description': '一张小狗站在地毯上的照片 
。它有着棕色的毛发和蓝色的眼睛。\n关键词:小狗、地毯、眼睛、毛色、站立'}, 
{'id': 456882250782308933, 'distance': 1.3568601608276367, 'origin': './train/Rhodesian_ridgeback/n02087394_6382.JPEG', 'image_description': '这是一只棕色的猎犬,耳朵垂下,脖子上戴着项圈。它正直视前方。\n\n关键词:狗、棕色、猎犬、耳朵、项链'}, 
{'id': 456882250782308940, 'distance': 1.3838427066802979, 'origin': './train/Rhodesian_ridgeback/n02087394_5846.JPEG', 'image_description': '两只小狗在毛毯上玩耍。一只狗躺在另一只上面,背景中有一个玩具熊。\n\n关键词:小狗、玩闹、毛毯、玩具熊、互动'}]
INFO:__main__:以文搜文查询结果: [
{'id': 456882250782309025, 'distance': 0.6969608068466187, 'origin': './train/mongoose/n02137549_7552.JPEG', 'image_description': '这是一张棕色的小动物的特写照片。它 
有着圆润的脸庞和大大的眼睛。\n\n关键词:小动物、棕毛、圆形脸、大眼、自然背景'}, 
{'id': 456882250782308933, 'distance': 0.7110348343849182, 'origin': './train/Rhodesian_ridgeback/n02087394_6382.JPEG', 'image_description': '这是一只棕色的猎犬,耳朵垂下,脖子上戴着项圈。它正直视前方。\n\n关键词:狗、棕色、猎犬、耳朵、项链'}, 
{'id': 456882250782308992, 'distance': 0.7725887298583984, 'origin': './train/lion/n02129165_19310.JPEG', 'image_description': '这是一张狮子的特写照片。它有着浓密的鬃毛和锐利的眼神。\n\n关键词:狮子、眼神、鬃毛、自然环境、野生动物'}]

示例2:以图搜图与以图搜文

本示例中,使用test目录中的狮子图片(路径为test\lion\n02129165_13728.JPEG)进行相似性检索。

image

通过以图搜图和以图搜文两种方式,分别从图像和文本的角度挖掘与目标图片相关的内容,实现多维度的相似性匹配。

if __name__ == "__main__":
    # 配置Milvus和DashScope API
    MILVUS_TOKEN = "root:****"
    MILVUS_HOST = "c-0aa16b1****.milvus.aliyuncs.com"
    MILVUS_PORT = "19530"
    COLLECTION_NAME = "multimodal_search"
    INDEX = "IVF_FLAT"  # IVF_FLAT OR HNSW
    DASHSCOPE_API_KEY = "<YOUR_DASHSCOPE_API_KEY >"

    # Step1:初始化Milvus客户端
    milvus_client = MilvusClient(MILVUS_TOKEN, MILVUS_HOST, MILVUS_PORT, INDEX, COLLECTION_NAME)
  
    # Step2:初始化多模态Embedding模型
    extractor = FeatureExtractor(DASHSCOPE_API_KEY)

    # Step5:多模态搜索示例,以图搜图和以图搜文
    image_query_path = "./test/lion/n02129165_13728.JPEG"
    image_embedding = extractor(image_query_path, "image")
    image_results_1 = milvus_client.search(image_embedding, field = 'image_embedding')
    logger.info(f"以图搜图查询结果: {image_results_1}")
    image_results_2 = milvus_client.search(image_embedding, field = 'text_embedding')
    logger.info(f"以图搜文查询结果: {image_results_2}")

返回信息如下所示。

说明

由于大模型的输出存在一定的随机性,本示例的结果可能无法完全复现。

INFO:__main__:以图搜图查询结果: [
{'id': 456882250782308987, 'distance': 0.23892249166965485, 'origin': './train/lion/n02129165_19953.JPEG', 'image_description': '这是一只雄壮的狮子站在岩石旁,背景是
树木和灌木丛。阳光洒在它的身上。\n\n关键词:狮子、岩石、森林、阳光、野性'}, 
{'id': 456882250782308989, 'distance': 0.4113130569458008, 'origin': './train/lion/n02129165_1142.JPEG', 'image_description': '一只狮子在茂密的绿色植物中休息。背景是竹子和树木。\n\n关键词:狮子、草地、绿植、树干、自然环境'}, 
{'id': 456882250782308984, 'distance': 0.5206397175788879, 'origin': './train/lion/n02129165_16.JPEG', 'image_description': '图中是一对狮子在草地上站立。雄狮鬃毛浓密,雌狮则显得更为瘦弱。\n\n关键词:狮子、草地、雄性、雌性、自然环境'}]
INFO:__main__:以图搜文查询结果: 
[{'id': 456882250782308989, 'distance': 1.0935896635055542, 'origin': './train/lion/n02129165_1142.JPEG', 'image_description': '一只狮子在茂密的绿色植物中休息。背景是
竹子和树木。\n\n关键词:狮子、草地、绿植、树干、自然环境'}, 
{'id': 456882250782308987, 'distance': 1.2102885246276855, 'origin': './train/lion/n02129165_19953.JPEG', 'image_description': '这是一只雄 
壮的狮子站在岩石旁,背景是树木和灌木丛。阳光洒在它的身上。\n\n关键词:狮子、岩石、森林、阳光、野性'}, 
{'id': 456882250782308992, 'distance': 1.2725986242294312, 'origin': './train/lion/n02129165_19310.JPEG', 'image_description': '这是一张狮子的特写照片。它有着浓密的鬃毛和锐利的眼神。\n\n关键词:狮子、眼神、鬃毛、自然环境、野生动物'}]