基于Lindorm多模能力快速搭建智能搜索服务

更新时间:
复制为 MD 格式

本文介绍如何基于Lindorm全文、向量融合检索等多模能力,快速搭建智能搜索服务。

方案概述

智能搜索是一种基于AI大模型的新型检索方式,它通过深入理解用户意图,提供精准、高效的问答式服务。

本方案帮助您快速构建智能搜索能力,解决以下核心问题:

  • 高效搭建与迭代:无需从零开始,提供开箱即用的基础设施,降低技术门槛,节省开发和运维成本。

  • 应对数据规模增长:支持大规模数据处理,优化检索性能,减少硬件投入和运维压力。

  • 提升检索精准性与灵活性:支持自定义模型部署和调优,满足业务个性化需求,确保搜索结果更贴合实际场景。

通过本方案,您能够专注于核心业务创新,同时获得更智能、更高效的搜索体验。

方案架构

本文将通过Python代码演示如何基于Lindorm全文、向量融合检索能力,帮您快速构建智能搜索业务,本架构仅体现部署和应用中Lindorm产品内容。

image

步骤一:开通Lindorm多模能力

  1. 登录Lindorm管理控制台

  2. 单击页面左上角的创建

  3. Lindorm售卖页面,设置以下配置项:

    配置项

    说明

    商品类型

    选择Lindorm

    实例配置

    勾选宽表引擎LTS引擎搜索引擎向量引擎AI引擎

    说明
    • 开通AI引擎时需选择开通dashscope模型。

    • Lindorm向量引擎的功能实现依赖搜索引擎,因此需同时开通。

    • 创建实例的方法及配置项说明,请参见创建实例

    • 您可以根据实际业务需求,变更实例的规格与节点,具体操作请参见变更实例规格

  4. 单击立即购买,并根据售卖页的指引,完成支付。

步骤二:配置白名单

将客户端IP添加至Lindorm白名单。如何添加,请参见设置白名单

步骤三:下载代码

请下载代码示例lindorm_smart_search,用于后续配置和构建智能搜索业务。

步骤四:环境配置

运行环境

已安装Python环境,要求安装Python 3.10及以上版本。

安装依赖

 pip3 install -r requirements.txt 

配置Lindorm连接地址

在已下载代码中的env脚本里配置Lindorm各引擎的连接地址。连接地址获取方法请参见查看连接地址

# AI host(配置AI引擎连接地址)
AI_HOST="ld-bp17j28j2y7pm****-proxy-ai-pub.lindorm.aliyuncs.com"
AI_PORT="9002"

# Row host(配置宽表引擎Mysql兼容地址)
ROW_HOST="ld-bp17j28j2y7pm****-proxy-lindorm-pub.lindorm.aliyuncs.com"
ROW_PORT="33060"

# Search host(配置搜索引擎连接地址)
SEARCH_HOST="ld-bp17j28j2y7pm****-proxy-search-pub.lindorm.aliyuncs.com"
SEARCH_PORT="30070"

# Lindorm user password(配置Lindorm用户密码)
LD_USER="root"
LD_PASSWORD="test****"

# 训练数据集的位置
LOAD_FILE_PATH="data/cmrc2018_train.json"

# 返回结果的最大数量
SEARCH_TOP_K="5"

安装Jupyter Notebook

  1. 安装Jupyter。

    pip3 install jupyter
  2. 生成配置文件~/.jupyter/jupyter_notebook_config.py

    jupyter notebook --generate-config
  3. 获取要设置访问Jupyter的密码 。

    from passlib.hash import argon2
    print(argon2.hash('Vector123'))

    输出密码哈希值,用于Jupyter Notebook的密码配置。输出示例如下:

    $argon2id$v=19$m=65536,t=3,p=4$4TyndM75H8N4b+291xqjdA$n0QSxlv/uCLjGR0TX/jbD/XFlEu9BzQGI1b2Mcu6gxg
  4. 使用 vim ~/.jupyter/jupyter_notebook_config.py 命令打开并编辑配置文件。

    #文件最后几行加上如下配置
    c.NotebookApp.ip = '*'
    # 笔记本的默认打开目录, 自行设置
    # 笔记本启动后是否打开浏览器, 设为 False即可
    c.NotebookApp.open_browser = False
    
    # 默认访问端口, 可自行修改
    c.NotebookApp.port = 9000
    
    # 下方代码中argon2后面的内容替换成上一步骤已获取到的Jupyter的密码
    c.NotebookApp.password = 'argon2:$argon2id$v=19$m=65536,t=3,p=4$4TyndM75H8N4b+291xqjdA$n0QSxlv/uCLjGR0TX/jbD/XFlEu9BzQGI1b2Mcu6gxg'
    
    # 这个主目录非常重要,后续您的访问文件需要放在该目录
    c.NotebookApp.notebook_dir = u'/data/lindorm/LindormDemo'  #设置你打开jupyter notebook的时候想显示的位置,可以设置成经常使用的路径
  5. 启动Jupyter服务。前端启动,可以查看是否有启动错误。若需停止服务,可使用Ctrl+C来终止进程。

    jupyter notebook
    说明

    实际业务使用过程中建议使用后端启动Jupyter服务,可以避免因终端关闭导致的服务中断,同时便于后台管理。后端启动请执行:nohup jupyter notebook --allow-root >/tmp/jupyter.log 2>&1 &

步骤五:运行ipynb脚本

Jupyter中按照lindorm_demo.ipynb的说明,依次执行代码。

接下来将详细介绍主要的部署环节。

部署流程图

image

部署环节说明

环节

说明

涉及引擎

创建父表(知识库)

创建一个父表,用于存储知识库的原始文本。

"""
创建父表语句,可按需修改
"""
def create_parent_table(self):
    sql = """
        CREATE TABLE IF NOT EXISTS {} (
            document_id  VARCHAR, 
            title VARCHAR, 
            context VARCHAR,
            status   INT, 
            metadata JSON, 
            PRIMARY KEY (document_id)
        )
    """.format(self.parent.row_parent_table)
    print("Create parent table sql: ", sql)
    self.common_create_table(sql)

宽表引擎

创建子表(切分知识库)

创建一个子表,用于存储原文本切分后的段落,这些段落可以根据长度或者结合长度与标点符号来切分。

"""
创建子表语句,可按需修改
"""
def create_child_table(self):
    sql = """
        CREATE TABLE IF NOT EXISTS {} (
            document_id VARCHAR,
            chunking_position INT,
            title  VARCHAR,
            {}   VARCHAR,
            {} VARCHAR,
            metadata JSON,
            chunking_number INT,
            PRIMARY KEY (document_id, chunking_position)
        )
    """.format(self.parent.row_child_table, 
               self.parent.text_field, 
               self.parent.vector_field)
    print("Create child table sql: ", sql)
    self.common_create_table(sql)

宽表引擎

创建搜索Pipeline

对子表创建写入和查询Pipeline,写入时子表内容自动同步到搜索引擎,在搜索引擎内完成Embedding后,将结果写入向量引擎。

查询时自动对文本进行Embedding,通过向量引擎进行向量检索。

"""
创建pipeline,搜索内部自动对text字段调用ai引擎进行embedding,写入vector_field 字段
"""
def create_pipeline(self):
    inner_ai_host = Config.AI_HOST
    if "-pub" in inner_ai_host:
        inner_ai_host = inner_ai_host.replace("-pub", "-vpc")
                
    pipeline = {
        "description": "demo_chunking pipeline",
        "processors": [
            {
                "text-embedding": {
                    "inputFields": [self.parent.text_field],   
                    "outputFields": [self.parent.vector_field],
                    "userName": Config.LD_USER,
                    "password": Config.LD_PASSWORD,
                    "url": "http://{}:{}/dashscope/compatible-mode/v1/embeddings".format(inner_ai_host, int(Config.AI_PORT)),
                    "modeName": self.parent.embedding_model_name
                }
            }
        ]
    }    
    try:
        resp = self.client.ingest.put_pipeline(id=self.parent.pipeline_name, body=pipeline)
        if resp.get("acknowledged"):
            print("Create ingest pipeline success", resp)
        else:
            print("Create ingest pipeline error", resp)
    except Exception as e:
        print(f"Create ingest pipeline error: {e}")

    # 创建查询用的 search pipeline
    search_pipeline_url = f"http://{Config.SEARCH_HOST}:{Config.SEARCH_PORT}/_search/pipeline/{self.parent.search_pipeline_name}"
    search_pipeline_data = {
        "description": "demo multimodal embedding pipeline",
        "request_processors": [
            {
                "text-embedding": {
                    "model_config": {
                        "inputFields": [self.parent.text_field],
                        "outputFields": [self.parent.vector_field],
                        "userName": Config.LD_USER,
                        "password": Config.LD_PASSWORD,
                        "url": f"http://{inner_ai_host}:{Config.AI_PORT}/dashscope/compatible-mode/v1/embeddings",
                        "modeName": self.parent.embedding_model_name
                    }
                }
            }
        ]
    }
    try:
        resp = requests.put(search_pipeline_url, auth=(Config.LD_USER, Config.LD_PASSWORD),
                            json=search_pipeline_data, headers={"Content-Type": "application/json"})
        if resp.status_code in [200, 201]:
            print("Create search pipeline success", resp)
        else:
            print("Create search pipeline error ", resp)
    except Exception as e:
        print(f"Create search pipeline error: {e}")

宽表引擎

搜索引擎

向量引擎

创建搜索索引和向量索引

创建搜索索引和向量索引,提高检索效率。

示例中使用IVFBQ索引,如果数据量较小,可以使用HNSW索引。

"""
子表创建搜索索引,本文范例中使用的是ivfbq索引
https://help.aliyun.com/document_detail/2773371.html
"""
# Reference: https://help.aliyun.com/document_detail/260841.html
def create_child_table_index(self):
    sql = """
        CREATE INDEX IF NOT EXISTS %s USING SEARCH ON %s (
                document_id(indexed=false,columnStored=false),
                chunking_position(indexed=false,columnStored=false),
                title(type=text,analyzer=ik),
                metadata(indexed=false,columnStored=false),
                %s(type=text,analyzer=ik),
                %s(mapping='{
                    "type": "knn_vector",
                    "dimension": 1024,
                    "data_type": "float",
                    "method": {
                        "engine": "lvector",
                        "name": "ivfbq",
                        "space_type": "cosinesimil",
                        "parameters": {
                            "exbits": 2,
                            "nlist": 50
                            }
                        }
                }')) WITH (INDEX_SETTINGS='{
                "index": {
                    "knn" : "true",
                    "knn.vector_empty_value_to_keep" : true
                }}',
                    SOURCE_SETTINGS=
                    '{
                        "includes": ["document_id", "chunking_position","title", "text"]
                    }',
                    numShards=2
                )
        """.strip() % (self.parent.chunking_index_name,
                       self.parent.row_child_table,
                       self.parent.text_field,
                       self.parent.vector_field)
    print("Create search index sql: \n ", sql)
    self.common_create_table(sql)

宽表引擎

搜索引擎

向量引擎

搜索索引配置Pipeline

为搜索索引配置写入和查询Pipeline。

def set_index_pipelines(self):
    """
    通过ES API为索引设置默认的写入pipeline和查询pipeline
    """
    url = f"http://{Config.SEARCH_HOST}:{Config.SEARCH_PORT}/{self.index_name}/_settings"
    settings = {
        "index.default_pipeline": self.parent.pipeline_name,
        "index.search.default_pipeline": self.parent.search_pipeline_name
    }
    try:
        resp = requests.put(url, auth=(Config.LD_USER, Config.LD_PASSWORD),
                            json=settings, headers={"Content-Type": "application/json"})
        if resp.status_code in [200, 201]:
            print(f"Index pipelines set successfully: write={self.parent.pipeline_name}, search={self.parent.search_pipeline_name}")
        else:
            print(f"Set index pipelines error: {resp.status_code}, {resp.text}")
    except Exception as e:
        print(f"Set index pipelines error: {e}")

搜索引擎

向量引擎

构建向量索引

ivfpq、ivfbq类型的索引需要在写入一定数据量之后进行索引构建手动触发索引构建完成后,后续可正常写入和查询,无需再次构建索引,具体可参考索引构建

def build_vector_index(self):
    """触发向量索引构建并等待完成"""
        
    url_base = f"http://{Config.SEARCH_HOST}:{Config.SEARCH_PORT}"
    auth = (Config.LD_USER, Config.LD_PASSWORD)
    index_name = f"default.{self.row_child_table}.{self.chunking_index_name}"
    field_name = self.vector_field
        
    print(f"开始构建向量索引: {index_name}.{field_name}")
        
    # 1. 触发构建
    build_url = f"{url_base}/_plugins/_vector/index/build"
    build_data = {
        "indexName": index_name,
        "fieldName": field_name,
        "removeOldIndex": "true"
    }
        
    try:
        resp = requests.post(build_url, auth=auth, json=build_data, headers={"Content-Type": "application/json"})
        print(f"触发构建响应: {resp.status_code}, {resp.text}")
            
        # 2. 轮询检查状态
        task_url = f"{url_base}/_plugins/_vector/index/tasks"
        task_data = {
            "indexName": index_name,
            "fieldName": field_name,
            "taskIds": "[]"
        }
            
        max_wait = 300  # 最多等5分钟
        start_time = time.time()
            
        while time.time() - start_time < max_wait:
            time.sleep(5)  # 每5秒检查一次
                
            resp = requests.get(task_url, auth=auth, json=task_data, headers={"Content-Type": "application/json"})
            result = resp.json()
            print(f"构建状态: {result}")
                
            if "payload" in result:
                for task in result["payload"]:
                    if "FINISH" in task:
                        print("向量索引构建完成!")
                        return True
                    elif "FAILED" in task:
                        print(f"向量索引构建失败: {task}")
                        return False
                            
        print("等待构建超时")
        return False
            
    except Exception as e:
        print(f"构建向量索引出错: {e}")
        return False

搜索索引

向量索引

检索方式说明

  • 检索选项:全文检索/向量检索/全文与向量混合检索。

  • 检索重排:使用重排/不使用重排。

  • Prompt:使用父文档/不使用父文档(将Context加入到提示词中)。

检索方式

具体描述

流程图

全文与向量混合检索

全文与向量混合检索。

def demo_rrf_search():
    lindorm = Lindorm()
    query="国际初中科学奥林匹克主要比赛科目"
    results = lindorm.lindormSearch.rrf_search(query)
    display(JSON(results, expanded=True, root="rrf_search_result"))
    lindorm.close()
demo_rrf_search()
image

全文与向量混合检索+使用检索重排

先进行全文与向量混合检索,然后对混合检索的结果重排序。

def demo_rerank():
    lindorm = Lindorm()
    query="国际初中科学奥林匹克主要比赛科目"
    topk=int(Config.SEARCH_TOP_K)
    origin_result = lindorm.lindormSearch.rrf_search(query,  topk * 2)
    display(JSON(origin_result[0:topk], expanded=True, root="Before rerank result"))
    texts = [item["_source"]["text"] for item in origin_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    reranked_origin_result = handler_reranker(origin_result, reranker_result, topk)
    display(JSON(reranked_origin_result, expanded=True, root="After rerank result"))
    lindorm.close()
demo_rerank()
image

全文与向量混合检索+使用检索重排+Prompt

  1. 先执行全文与向量混合检索,对混合检索的结果重排序。

  2. 将问题以及检索结果一起形成Prompt,提交给大模型。

def demo_chat_with_child_chunking():
    lindorm = Lindorm()
    topk=int(Config.SEARCH_TOP_K)
    query="国际初中科学奥林匹克主要比赛科目"
    search_result = lindorm.lindormSearch.rrf_search(query, topk * 2)
    texts = [item["_source"]["text"] for item in search_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    prompt_context = "\n".join(texts[item['index']] for item in reranker_result[0:topk])
    prompt = lindorm.lindormAI.gen_prompt(query, prompt_context)
    output_text = ""
    for part in lindorm.lindormAI.chat_stream(prompt):
        output_text = part 
        wrapped_text = wrap_text(output_text, 80)
        clear_output(wait=True)
        display(HTML(f"<pre style='color: red;'>{wrapped_text}</pre>"))
    
    wrapped_text = wrap_text(prompt, 120)
    display(HTML(f"<pre>提示模版为:\n{wrapped_text}</pre>"))
    lindorm.close()
    
demo_chat_with_child_chunking()
image

全文与向量混合检索+使用检索重排+Context Prompt

  1. 先执行全文与向量混合检索,对混合检索的结果重排序。

  2. 根据父表(create_parent_table表)的PRIMARY KEY字段,从父表中查询初步检索结果的Context。

  3. 将问题与Context一起形成Prompt,并通过AI引擎访问大模型,获得回答。

def demo_chat_with_parent():
    lindorm = Lindorm()
    topk=int(Config.SEARCH_TOP_K)
    query="国际初中科学奥林匹克主要比赛科目"
    search_result = lindorm.lindormSearch.rrf_search(query, topk * 2)
    texts = [item["_source"]["text"] for item in search_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    reranked_origin_result = handler_reranker(search_result, reranker_result, topk)
    unique_document_ids = list(OrderedDict.fromkeys(item['_source']['document_id'] for item in reranked_origin_result))
    contexts = []
    for document_id in unique_document_ids:
        contexts.append(lindorm.lindormRow.get_parent_context(pk=document_id))
    prompt_context = "\n".join(contexts)        
    prompt = lindorm.lindormAI.gen_prompt(query, prompt_context)    
    # stream
    for part in lindorm.lindormAI.chat_stream(prompt):
        output_text = part 
        wrapped_text = wrap_text(output_text, 80)
        clear_output(wait=True)
        display(HTML(f"<pre style='color: red;'>{wrapped_text}</pre>"))
    
    wrapped_text = wrap_text(prompt, 120)
    display(HTML(f"<pre>提示模版为:\n{wrapped_text}</pre>")) 
    lindorm.close()

demo_chat_with_parent()
image