自动向量化数据的写入与查询

Lindorm向量引擎基于Embedding模型可以实现写入和查询文本的自动向量化,消除了传统方案中手动定义向量字段的繁琐流程。本文介绍如何通过Python代码在Lindorm向量引擎中实现自动向量化数据的写入与查询操作。

前提条件

  • 已安装Python环境,且Python3.6及以上版本。

  • 已开通宽表引擎。

  • 开通AI引擎

  • 开通向量引擎

  • 开通搜索引擎,且搜索引擎为3.9.10及以上版本。查看或升级当前版本,请参见搜索引擎版本说明升级小版本

    重要

    如果您的搜索引擎为3.9.10以下版本,但控制台显示已是最新版本,请联系Lindorm技术支持(钉钉号:s0s3eg3)。

  • ECS实例与Lindorm实例属于同一专有网络。

  • 已将客户端IP地址添加至Lindorm白名单

准备工作

在创建和使用向量索引前,您需要通过opensearch-py连接搜索引擎,连接方式如下:

from opensearchpy import OpenSearch, Object

import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 如果使用 logging,为防止 opensearch info 日志过多,需要进行以下修改
logging.getLogger('opensearch').setLevel(logging.WARN)


def get_client() -> OpenSearch:
    search_client = OpenSearch(
        hosts=[{"host": "ld-t4n5668xk31ui****.lindorm.aliyuncs.com", "port": 30070}],
        http_auth=("<username>", "<password>"),
        http_compress=False,
        use_ssl=False,
    )
    return search_client

其中hostusernamepassword分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看连接信息

操作步骤概览

操作步骤

涉及引擎

说明

步骤一:AI引擎中部署Embedding模型

AI引擎

调用AI引擎RESTful API,部署Embedding模型BGE-M3,用于将文本转换为向量。

步骤二:在搜索引擎中创建Pipeline

搜索引擎

在搜索引擎中定义数据写入和查询流程。调用AI引擎中的BGE-M3模型分别将写入、查询文本转换为向量。

步骤三:创建向量索引并指定Pipeline

  • 向量引擎

  • 搜索引擎

在向量引擎中创建/修改向量索引,并将写入Pipeline和查询Pipeline与索引绑定。

步骤四:数据写入

  • 向量引擎

  • 搜索引擎

执行数据写入Pipeline。输入原始文本数据,触发BGE-M3模型生成Embedding。批量写入文本和对应的向量。

步骤五:数据查询

  • 向量引擎

  • 搜索引擎

执行查询Pipeline,执行查询Pipeline。 输入查询文本,生成查询向量。在向量引擎中匹配文本并返回结果。

步骤一:在AI引擎中部署Embedding模型

本文以部署BGE-M3模型为例,参数详情请参见模型管理。如需查看更多支持的模型列表,请参见模型列表

url = "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com:9002/v1/ai/models/create"  # AI引擎的专用网络连接地址
self.headers = {
    "Content-Type": "application/json; charset=utf-8",
    "x-ld-ak": "<username>",  # AI引擎的用户名
    "x-ld-sk": "<password>"  # AI引擎的密码
}
data = {
    "model_name": "bge_m3_model",
    "model_path": "huggingface://BAAI/bge-m3",
    "task": "FEATURE_EXTRACTION",
    "algorithm": "BGE_M3",
    "settings": {"instance_count": "2"}
}
response = requests.post(url, data=json.dumps(data), headers=self.headers)
json_response = response.json()
if response.status_code != 200 or json_response["success"] is False:
    print("http request failed, status code: {}".format(json_response["msg"]))  

步骤二:在搜索引擎中创建Pipeline

在搜索引擎中分别创建写入Pipeline和查询Pipeline,可自动调用AI引擎上已部署的Embedding模型,完成从文本解析到向量表征的全自动处理流程。

创建写入Pipeline

pipeline = {
    "description": "demo_chunking pipeline",
    "processors": [
        {
            "text-embedding": {
                "inputFields": ["text_field"],
                "outputFields": ["text_field_embedding"],
                "userName": "root",   # AI引擎的用户名
                "password":  "test****",  # AI引擎的密码
                "url": "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com:9002",   # AI引擎的专有网络连接地址
                "modelName": "bge_m3_model"
            }
        }
    ]
}
try:
    response = self.client.ingest.put_pipeline(id=self.write_embedding_pipeline, body=pipeline)
    print("Create pipeline success", response)
except Exception as e:
    print("Create pipeline error ", e)   

参数说明

参数

说明

是否必填

processors

处理器,处理写入请求的配置。

text-embedding

固定Key,用来标识当前Pipeline为文本写入自动Embedding。

inputFields

搜索索引中,需要向量化的文本字段。该字段必须与后续创建向量索引时的text_field一致。

outputFields

用于存储向量数据的向量字段。该字段必须与后续创建向量索引时的text_field_embedding保持一致。

userName

Lindorm AI引擎的默认用户名

password

Lindorm AI引擎的默认初始密码

url

AI引擎的连接地址务必使用专有网络连接地址。

modeName

模型名称,本文以bge_m3_model为例。

创建查询Pipeline

pipeline = {
    "request_processors": [
        {
            "text-embedding": {
                "tag": "auto-query-embedding",
                "description": "Auto query embedding",
                "model_config": {
                    "inputFields": ["text_field"],
                    "outputFields": ["text_field_embedding"],
                    "userName": "root",   # AI引擎的用户名
                    "password":  "test****",  # AI引擎的密码
                    "url": "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com:9002",   # AI引擎的专有网络连接地址
                    "modelName": "bge_m3_model"
                }
            }
        }
    ]
}
try:
    response = self.client.search_pipeline.put(id=self.knn_embedding_pipeline, body=pipeline)
    print("Create pipeline success", response)
except Exception as e:
    print("Create pipeline error ", e)

参数说明

参数

说明

是否必填

request_processors

文本处理器,处理查询请求的配置。

text-embedding

固定Key,用来标识当前Pipeline为文本查询自动Embedding。

tag

处理器标识。可自定义取值,但需保持唯一性。

description

处理器的说明。

model_config

包含模型具体参数的字典。

inputFields

与写入PipelineinputFields字段保持一致。该字段必须与后续创建向量索引时的text_field一致。

outputFields

inputfields中文本字段对应的向量字段。该字段必须与后续创建向量索引时的text_field_embedding保持一致。

userName

Lindorm AI引擎的默认用户名

password

Lindorm AI引擎的默认初始密码

url

AI引擎的连接地址务必使用专有网络连接地址。

modeName

模型名称,本文以bge_m3_model为例。

步骤三:创建向量索引并指定Pipeline

在创建向量索引或修改现有向量索引设置时,请指定所需的Pipeline,参数详情请参见创建向量索引

创建向量索引

说明

创建向量索引时的text_fieldtext_field_embedding,必须与写入和查询Pipeline中指定的inputFieldsoutputFields保持一致。

index_body = {
    "settings": {
        "index": {
            "number_of_shards": 2,
            "knn": True,
            "default_pipeline": self.write_embedding_pipeline,
            "search.default_pipeline": self.knn_embedding_pipeline
        }
    },
    "mappings": {
        "_source": {
            "excludes": ["text_field_embedding"]
        },
        "properties": {
            "text_field": {
                "type": "text",
                "analyzer": "ik_max_word"
            },
            "text_field_embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "data_type": "float",
                "method": {
                    "engine": "lvector",
                    "name": "hnsw",
                    "space_type": "cosinesimil",
                    "parameters": {
                        "m": 24,
                        "ef_construction": 500
                    }
                }
            },
            "tag": {
                "type": "keyword"
            },
            "brand": {
                "type": "keyword"
            },
            "merit": {
                "type": "text",
                "analyzer": "ik_max_word"
            }
        }
    }
}
try:
    response = self.client.indices.create(index=self.index, body=index_body)
    print("Create index success", response)
except Exception as e:
    print("Create index error ", e)

修改现有向量索引设置

如果您已经创建了向量索引,可以通过以下方式修改其配置,指定写入和查询时使用的Pipeline。

body = {
    "index": {
        "default_pipeline": self.write_embedding_pipeline,
        "search.default_pipeline": self.knn_embedding_pipeline
    }
}
print(self.client.indices.put_settings(index=self.index, body=body))

步骤四:写入数据

在写入数据时,系统会先将原始文本写入text_field字段。然后,根据向量索引配置的写入Pipeline,系统会自动调用AI引擎中的BGE-M3模型,将text_field编码为向量,并将生成的向量数据写入text_field_embedding字段。

bulk_data = [
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "3982"},
        "text_field": "品牌A 时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)",
        "tag": ["鼠标", "电子产品"],
        "brand": "品牌A",
        "merit": "好用、外观漂亮"
    },
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "323519"},
        "text_field": "品牌B 光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)",
        "tag": ["鼠标", "电子产品"],
        "brand": "品牌B",
        "merit": "质量好、到货速度快、外观漂亮、好用"
    },
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "300265"},
        "text_field": "品牌C 耳塞式耳机 白色(经典时尚)",
        "tag": ["耳机", "电子产品"],
        "brand": "品牌C",
        "merit": "外观漂亮、质量好"
    },
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "6797"},
        "text_field": "品牌D 两刀头充电式电动剃须刀",
        "tag": ["家用电器", "电动剃须刀"],
        "brand": "品牌D",
        "merit": "好用、外观漂亮"
    },
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "8195"},
        "text_field": "品牌E Class4 32G TF卡(micro SD)手机存储卡",
        "tag": ["存储设备", "存储卡", "SD卡"],
        "brand": "品牌E",
        "merit": "容量挺大的、速度快、好用、质量好"
    },
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "13316"},
        "text_field": "品牌E 101 G2 32GB 优盘",
        "tag": ["存储设备", "U盘", "优盘"],
        "brand": "品牌E",
        "merit": "好用、容量挺大的、速度快"
    },
    {
        "index": {"_index": "search_vector_pipeline_test", "_id": "14103"},
        "text_field": "品牌B 64GB至尊高速移动存储卡 UHS-1制式 读写速度最高可达30MB",
        "tag": ["存储设备", "存储卡", "SD卡"],
        "brand": "品牌B",
        "merit": "容量挺大的、速度快、好用"
    }
]
response = self.client.bulk(bulk_data, refresh=True)
print(response)

步骤五:查询数据

支持以下三种查询方式:

  • 纯向量检索:通过计算向量相似度直接返回与查询向量匹配的结果。

  • 向量+属性过滤检索:在向量相似度检索的基础上,结合元数据(如时间、类别、标签等)的条件过滤,精准筛选符合特定属性的相似结果。

  • 向量+全文+属性过滤检索:综合向量相似度、全文匹配和属性条件过滤,实现多维度约束下的精准检索。

纯向量检索

query = {
    "size": 10,
    "_source": True,
    "query": {
        "knn": {
            "text_field_embedding": {
                "query_text": "存储卡",
                "k": 10
            }
        }
    },
    "ext": {"lvector": {"ef_search": "200"}}
}
response = self.client.search(index=index_name, body=query)
print(response)

返回结果

{
    "took": 64,
    "timed_out": false,
    "terminated_early": false,
    "num_reduce_phases": 0,
    "_shards": {
        "total": 2,
        "successful": 2,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 0.6915956,
        "hits": [
            {
                "_index": "search_vector_pipeline_test",
                "_id": "8195",
                "_score": 0.6915956,
                "_source": {
                    "index": {
                        "_index": "search_vector_pipeline_test",
                        "_id": "13316"
                    },
                    "text_field": "品牌E 101 G2 32GB 优盘",
                    "merit": "好用、容量挺大的、速度快",
                    "tag": ["存储设备", "U盘", "优盘"],
                    "brand": "品牌E"
                }
            },
            {
                "_index": "search_vector_pipeline_test",
                "_id": "300265",
                "_score": 0.6262013,
                "_source": {
                    "index": {
                        "_index": "search_vector_pipeline_test",
                        "_id": "6797"
                    },
                    "text_field": "品牌D 两刀头充电式电动剃须刀",
                    "merit": "好用、外观漂亮",
                    "tag": ["家用电器", "电动剃须刀"],
                    "brand": "品牌D"
                }
            },
            {
                "_index": "search_vector_pipeline_test",
                "_id": "3982",
                "_score": 0.624792,
                "_source": {
                    "index": {
                        "_index": "search_vector_pipeline_test",
                        "_id": "323519"
                    },
                    "text_field": "品牌B 光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)",
                    "merit": "质量好、到货速度快、外观漂亮、好用",
                    "tag": ["鼠标", "电子产品"],
                    "brand": "品牌B"
                }
            }
        ]
    }
}

向量+属性过滤检索

query = {
    "size": 10,
    "_source": True,
    "query": {
        "knn": {
            "text_field_embedding": {
                "query_text": "存储卡",
                "k": 10,
                "filter": {
                    "bool": {
                        "filter": [{
                            "match": {
                                "merit": "质量好"
                            }
                        },
                            {
                                "term": {
                                    "brand": "品牌E"
                                }
                            },
                            {
                                "terms": {
                                    "tag": ["SD卡", "存储卡"]
                                }
                            }]
                    }
                }
            }
        }
    },
    "ext": {
        "lvector": {
            "filter_type": "efficient_filter",
            "hybrid_search_type": "filter_rrf",
            "rrf_rank_constant": "1",
            "ef_search": "200"
        }
    }
}
response = self.client.search(index=index_name, body=query)
print(response)

返回结果

{
    "took": 3,
    "timed_out": false,
    "_shards": {
        "total": 2,
        "successful": 2,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    }
}

向量+全文+属性过滤检索

query = {
    "size": 10,
    "_source": True,
     "query": {
        "knn": {
            "text_field_embedding": {
                 "query_text": "存储卡",
                "filter": {
                    "bool": {
                         "must": [{
                            "bool": {
                                "must": [{
                                    "match": {
                                        "text_field": {
                                            "query": "存储卡"
                                        }
                                    }
                                }]
                            }
                        },
                            {
                                "bool": {
                                    "filter": [{
                                        "match": {
                                            "merit": "质量好"
                                        }
                                    },
                                        {
                                            "term": {
                                                "brand": "品牌E"
                                            }
                                        },
                                        {
                                            "terms": {
                                                "tag": ["SD卡", "存储卡"]
                                            }
                                        }]
                                }
                            }]
                    }
                },
                "k": 10
            }
        }
    },
    "ext": {
        "lvector": {
            "filter_type": "efficient_filter",
            "hybrid_search_type": "filter_rrf",
            "rrf_rank_constant": "1",
            "ef_search": "200"
        }
    }
}
response = self.client.search(index=self.index, body=query)
print(response)

返回结果

{
    "took": 5,
    "timed_out": false,
    "terminated_early": false,
    "num_reduce_phases": 0,
    "_shards": {
        "total": 2,
        "successful": 2,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": 0.0,
        "hits": []
    }
}

完整代码示例

LindormSearch.py

该模块是Lindorm搜索引擎的基础交互工具类,核心功能包括连接搜索引擎,索引操作及查询逻辑实现等。

import json
import random

from opensearchpy import OpenSearch
from ldconfig import Config


class LindormSearch:
    def __init__(self):
        self.random = random.Random(0)
        self.top_k = 5
        try:
            self.client = OpenSearch(
                hosts=[{"host": Config.SEARCH_HOST, "port": Config.SEARCH_PORT}],
                http_auth=(Config.LD_USER, Config.LD_PASSWORD),
                http_compress=False,
                use_ssl=False,
            )
        except Exception as e:
            print("Connection search error", e)

    def get_index(self, index_name):
        print(index_name, self.client.indices.get(index_name))

    def bulk_insert(self, index_name, count=1000):
        operations = []
        for i in range(0, count):
            id = self.random.randint(-2 ** 63, 2 ** 63 - 1)
            operations.append(json.dumps({"index": {"_index": index_name, "_id": id}}))
            operations.append("\n")
            vector1 = []
            for j in range(0, 3):
                vector1.append(self.random.random())
            operations.append(json.dumps({"field1": self.random.random(), "vector1": vector1}))
            operations.append("\n")
        response = self.client.bulk("".join(operations), refresh=True)
        print(response)

    def knn_search(self, index_name):
        query = {
            "size": 10,
            "query": {
                "knn": {
                    "vector1": {
                        "vector": [2.3, 3.3, 4.4],
                        "k": 10
                    }
                }
            },
            "ext": {"lvector": {"min_score": "0.001"}}
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

    def knn_search_with_filter(self, index_name):
        query = {
            "size": 10,
            "query": {
                "knn": {
                    "vector1": {
                        "vector": [2.3, 3.3, 4.4],
                        "filter": {
                            "range": {
                                "field1": {
                                    "gte": 0
                                }
                            }
                        },
                        "k": 10
                    }
                }
            },
            "ext": {"lvector": {"filter_type": "efficient_filter", "ef_search": "100", "k_expand_scope": "2000"}}
        }
        response = self.client.search(index=index_name, body=query)
        print(response)


    def text_search(self, text_query, k=int(Config.SEARCH_TOP_K)):
        query_body = {
            "size": k,
            "_source": ["document_id", "chunking_position", "text"],
            "query": {
                "match": {
                    self.parent.text_field: text_query
                }
            }
        }
        res = self.client.search(index=self.index_name, body=query_body)
        return res['hits']['hits']


    def vector_search(self, text_query, k=int(Config.SEARCH_TOP_K)):
        vector = self.parent.lindormAI.text_embedding(text_query)
        query_body = {
            "size": k,
            "_source": ["document_id", "chunking_position", "text"],
            "query": {
                "knn": {
                    self.parent.vector_field: {
                        "vector": vector,
                        "k": k
                    }
                }
            },
            "ext": {"lvector": {"ef_search": "200"}}
        }
        res = self.client.search(index=self.index_name, body=query_body)
        return res['hits']['hits']


    def rrf_search(self, text_query, k=int(Config.SEARCH_TOP_K)):
        vector = self.parent.lindormAI.text_embedding(text_query)
        query_body = {
            "size": k,
            "_source": ["document_id", "chunking_position", "text"],
            "query": {
                "knn": {
                    self.parent.vector_field: {
                        "vector": vector,
                        "filter": {
                            "match": {
                                self.parent.text_field: text_query,
                            }
                        },
                        "k": k
                    }
                }
            },
            "ext": {"lvector": {
                "hybrid_search_type": "filter_rrf",
                "rrf_rank_constant": "60",
                "ef_search": "200"
            }}
        }
        res = self.client.search(index=self.index_name, body=query_body)
        return res['hits']['hits']         

ldconfig.py

配置文件,存储全局参数,包括连接信息等参数。

class Config:
    # 设置全局变量(需替换为您的实际信息)
    LD_ES_HOST = "ld-t4n5668xk31ui****-proxy-search-vpc.lindorm.aliyuncs.com"  # 搜索引擎的连接地址
    LD_ES_PORT = 30070  # 搜索引擎的端口号
    LD_USER = "root"  # 搜索/AI引擎的默认用户名
    LD_PASSWORD = "test****"  # 搜索/AI引擎的默认初始密码

    # AI引擎连接信息(需替换)
    AI_HOST = "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com"  # AI引擎的连接地址
    AI_PORT = 9002  # AI引擎的端口号

    SEARCH_TOP_K = 10  # 根据实际情况设置合适的值 

HnswIndexWithPipeline.py

该模块实现了完整的自动向量化流程,封装自步骤一到步骤五的所有操作。

import json

import requests

from LindormSearch import LindormSearch
from ldconfig import Config


class HnswIndexWithPipeline(LindormSearch):
    def __init__(self):
        super().__init__()
        self.index = "search_vector_pipeline_test"
        self.write_embedding_pipeline = "write_embedding_pipeline"
        self.knn_embedding_pipeline = "knn_embedding_pipeline"

    def create_ai_model(self):
        url = "http://{}:{}/v1/ai/models/create".format(Config.AI_HOST, int(Config.AI_PORT))
        headers = {
            "Content-Type": "application/json; charset=utf-8",
            "x-ld-ak": Config.LD_USER,
            "x-ld-sk": Config.LD_PASSWORD
        }
        data = {
            "model_name": "bge_m3_model",
            "model_path": "huggingface://BAAI/bge-m3",
            "task": "FEATURE_EXTRACTION",
            "algorithm": "BGE_M3",
            "settings": {"instance_count": "2"}
        }
        response = requests.post(url, data=json.dumps(data), headers=headers)
        json_response = response.json()
        if response.status_code != 200 or json_response["success"] is False:
            print("http request failed, status code: {}".format(json_response["msg"]))

    def check_pipeline_exists(self) -> bool:
        try:
            print(self.client.ingest.get_pipeline(id=self.write_embedding_pipeline))
            return True
        except Exception as e:
            return False

    def create_write_embedding_pipeline(self):

        if self.check_pipeline_exists():
            print("Pipeline {} exists".format(self.write_embedding_pipeline))
            return
        inner_ai_host = Config.AI_HOST
        if "-pub" in inner_ai_host:
            inner_ai_host = inner_ai_host.replace("-pub", "-vpc")

        pipeline = {
            "description": "demo_chunking pipeline",
            "processors": [
                {
                    "text-embedding": {
                        "inputFields": ["text_field"],
                        "outputFields": ["text_field_embedding"],
                        "userName": Config.LD_USER,
                        "password": Config.LD_PASSWORD,
                        "url": "http://{}:{}".format(inner_ai_host, int(Config.AI_PORT)),
                        "modeName": "bge_m3_model"
                    }
                }
            ]
        }
        try:
            response = self.client.ingest.put_pipeline(id=self.write_embedding_pipeline, body=pipeline)
            print("Create pipeline success", response)
        except Exception as e:
            print("Create pipeline errr ", e)

    def check_knn_pipeline_exists(self) -> bool:
        try:
            response = self.client.search_pipeline.get(id=self.knn_embedding_pipeline)
            print(response)
            return True
        except Exception as e:
            return False

    def create_knnsearch_pipeline(self):
        if self.check_knn_pipeline_exists():
            print("Pipeline {} exists".format(self.knn_embedding_pipeline))
            # 如果pipeline已经存在,目前策略是跳过,如果是需要调整参数重新创建,则注释掉下方的return
            return
        inner_ai_host = Config.AI_HOST
        if "-pub" in inner_ai_host:
            inner_ai_host = inner_ai_host.replace("-pub", "-vpc")

        pipeline = {
            "request_processors": [
                {
                    "text-embedding": {
                        "tag": "auto-query-embedding",
                        "description": "Auto query embedding",
                        "model_config": {
                            "inputFields": ["text_field"],
                            "outputFields": ["text_field_embedding"],
                            "userName": Config.LD_USER,
                            "password": Config.LD_PASSWORD,
                            "url": "http://{}:{}".format(inner_ai_host, int(Config.AI_PORT)),
                            "modeName": "bge_m3_model"
                        }
                    }
                }
            ]
        }
        try:
            response = self.client.search_pipeline.put(id=self.knn_embedding_pipeline, body=pipeline)
            print("Create pipeline success", response)
        except Exception as e:
            print("Create pipeline errr ", e)

    def create_index_with_pipeline(self):
        index_body = {
            "settings": {
                "index": {
                    "number_of_shards": 2,
                    "knn": True,
                    "default_pipeline": self.write_embedding_pipeline,
                    "search.default_pipeline": self.knn_embedding_pipeline
                }
            },
            "mappings": {
                "_source": {
                    "excludes": ["text_field_embedding"]
                },
                "properties": {
                    "text_field": {
                        "type": "text",
                        "analyzer": "ik_max_word"
                    },
                    "text_field_embedding": {
                        "type": "knn_vector",
                        "dimension": 1024,
                        "data_type": "float",
                        "method": {
                            "engine": "lvector",
                            "name": "hnsw",
                            "space_type": "cosinesimil",
                            "parameters": {
                                "m": 24,
                                "ef_construction": 500
                            }
                        }
                    },
                    "tag": {
                        "type": "keyword"
                    },
                    "brand": {
                        "type": "keyword"
                    },
                    "merit": {
                        "type": "text",
                        "analyzer": "ik_max_word"
                    }
                }
            }
        }
        try:
            response = self.client.indices.create(index=self.index, body=index_body)
            print("Create index success", response)
        except Exception as e:
            print("Create index errr ", e)

    def update_index_setting(self):
        body = {
            "index": {
                "default_pipeline": self.write_embedding_pipeline,
                "search.default_pipeline": self.knn_embedding_pipeline
            }
        }
        print(self.client.indices.put_settings(index=self.index, body=body))

    def bulk_insert_docs(self):
        bulk_data = [
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "3982"},
                "text_field": "品牌A 时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)",
                "tag": ["鼠标", "电子产品"],
                "brand": "品牌A",
                "merit": "好用、外观漂亮"
            },
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "323519"},
                "text_field": "品牌B 1090光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)",
                "tag": ["鼠标", "电子产品"],
                "brand": "品牌B",
                "merit": "质量好、到货速度快、外观漂亮、好用"
            },
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "300265"},
                "text_field": "品牌C 耳塞式耳机 白色(经典时尚)",
                "tag": ["耳机", "电子产品"],
                "brand": "品牌C",
                "merit": "外观漂亮、质量好"
            },
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "6797"},
                "text_field": "品牌D 两刀头充电式电动剃须刀",
                "tag": ["家用电器", "电动剃须刀"],
                "brand": "品牌D",
                "merit": "好用、外观漂亮"
            },
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "8195"},
                "text_field": "品牌E Class4 32G TF卡(micro SD)手机存储卡",
                "tag": ["存储设备", "存储卡", "SD卡"],
                "brand": "品牌E",
                "merit": "容量挺大的、速度快、好用、质量好"
            },
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "13316"},
                "text_field": "品牌E 101 G2 32GB 优盘",
                "tag": ["存储设备", "U盘", "优盘"],
                "brand": "品牌E",
                "merit": "好用、容量挺大的、速度快"
            },
            {
                "index": {"_index": "search_vector_pipeline_test", "_id": "14103"},
                "text_field": "品牌B 64GB至尊高速移动存储卡 UHS-1制式 读写速度最高可达30MB",
                "tag": ["存储设备", "存储卡", "SD卡"],
                "brand": "品牌B",
                "merit": "容量挺大的、速度快、好用"
            }
        ]
        response = self.client.bulk(bulk_data, refresh=True)
        print(response)

    def knn_search(self, index_name):
        query = {
            "size": 10,
            "_source": True,
            "query": {
                "knn": {
                    "text_field_embedding": {
                        "query_text": "存储卡",
                        "k": 10
                    }
                }
            },
            "ext": {"lvector": {"ef_search": "200"}}
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

    def knn_search_with_filter(self, index_name):
        query = {
            "size": 10,
            "_source": True,
            "query": {
                "knn": {
                    "text_field_embedding": {
                        "query_text": "存储卡",
                        "k": 10,
                        "filter": {
                            "bool": {
                                "filter": [{
                                    "match": {
                                        "merit": "质量好"
                                    }
                                },
                                    {
                                        "term": {
                                            "brand": "品牌E"
                                        }
                                    },
                                    {
                                        "terms": {
                                            "tag": ["SD卡", "存储卡"]
                                        }
                                    }]
                            }
                        }
                    }
                }
            },
            "ext": {
                "lvector": {
                    "filter_type": "efficient_filter",
                    "ef_search": "200"
                }
            }
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

    def rrf_search_with_filter(self):
        query = {
            "size": 10,
            "_source": True,
            "query": {
                "knn": {
                    "text_field_embedding": {
                        "query_text": "存储卡",
                        "filter": {
                            "bool": {
                                "must": [{
                                    "bool": {
                                        "must": [{
                                            "match": {
                                                "text_field": {
                                                    "query": "存储卡"
                                                }
                                            }
                                        }]
                                    }
                                },
                                    {
                                        "bool": {
                                            "filter": [{
                                                "match": {
                                                    "merit": "质量好"
                                                }
                                            },
                                                {
                                                    "term": {
                                                        "brand": "品牌E"
                                                    }
                                                },
                                                {
                                                    "terms": {
                                                        "tag": ["SD卡", "存储卡"]
                                                    }
                                                }]
                                        }
                                    }]
                            }
                        },
                        "k": 10
                    }
                }
            },
            "ext": {
                "lvector": {
                    "filter_type": "efficient_filter",
                    "hybrid_search_type": "filter_rrf",
                    "rrf_rank_constant": "1",
                    "ef_search": "200"
                }
            }
        }
        response = self.client.search(index=self.index, body=query)
        print(response)


if __name__ == '__main__':
    hnsw = HnswIndexWithPipeline()
    hnsw.create_ai_model()
    hnsw.create_write_embedding_pipeline()
    hnsw.create_knnsearch_pipeline()
    hnsw.create_index_with_pipeline()
    hnsw.update_index_setting()
    hnsw.get_index(hnsw.index)
    hnsw.bulk_insert_docs()
    hnsw.knn_search(hnsw.index)
    hnsw.knn_search_with_filter(hnsw.index)
    hnsw.rrf_search_with_filter()