Lindorm向量引擎基于Embedding模型可以实现写入和查询文本的自动向量化,消除了传统方案中手动定义向量字段的繁琐流程。本文介绍如何通过Python代码在Lindorm向量引擎中实现自动向量化数据的写入与查询操作。
前提条件
准备工作
在创建和使用向量索引前,您需要通过opensearch-py连接搜索引擎,连接方式如下:
from opensearchpy import OpenSearch, Object
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 如果使用 logging,为防止 opensearch info 日志过多,需要进行以下修改
logging.getLogger('opensearch').setLevel(logging.WARN)
def get_client() -> OpenSearch:
search_client = OpenSearch(
hosts=[{"host": "ld-t4n5668xk31ui****.lindorm.aliyuncs.com", "port": 30070}],
http_auth=("<username>", "<password>"),
http_compress=False,
use_ssl=False,
)
return search_client
其中host、username和password分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看连接信息。
操作步骤概览
操作步骤 | 涉及引擎 | 说明 |
AI引擎 | 调用AI引擎RESTful API,部署Embedding模型BGE-M3,用于将文本转换为向量。 | |
步骤二:在搜索引擎中创建Pipeline | 搜索引擎 | 在搜索引擎中定义数据写入和查询流程。调用AI引擎中的BGE-M3模型分别将写入、查询文本转换为向量。 |
| 在向量引擎中创建/修改向量索引,并将写入Pipeline和查询Pipeline与索引绑定。 | |
步骤四:数据写入 |
| 执行数据写入Pipeline。输入原始文本数据,触发BGE-M3模型生成Embedding。批量写入文本和对应的向量。 |
步骤五:数据查询 |
| 执行查询Pipeline,执行查询Pipeline。 输入查询文本,生成查询向量。在向量引擎中匹配文本并返回结果。 |
步骤一:在AI引擎中部署Embedding模型
本文以部署BGE-M3模型为例,参数详情请参见模型管理。如需查看更多支持的模型列表,请参见模型列表。
url = "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com:9002/v1/ai/models/create" # AI引擎的专用网络连接地址
self.headers = {
"Content-Type": "application/json; charset=utf-8",
"x-ld-ak": "<username>", # AI引擎的用户名
"x-ld-sk": "<password>" # AI引擎的密码
}
data = {
"model_name": "bge_m3_model",
"model_path": "huggingface://BAAI/bge-m3",
"task": "FEATURE_EXTRACTION",
"algorithm": "BGE_M3",
"settings": {"instance_count": "2"}
}
response = requests.post(url, data=json.dumps(data), headers=self.headers)
json_response = response.json()
if response.status_code != 200 or json_response["success"] is False:
print("http request failed, status code: {}".format(json_response["msg"]))
步骤二:在搜索引擎中创建Pipeline
在搜索引擎中分别创建写入Pipeline和查询Pipeline,可自动调用AI引擎上已部署的Embedding模型,完成从文本解析到向量表征的全自动处理流程。
创建写入Pipeline
pipeline = {
"description": "demo_chunking pipeline",
"processors": [
{
"text-embedding": {
"inputFields": ["text_field"],
"outputFields": ["text_field_embedding"],
"userName": "root", # AI引擎的用户名
"password": "test****", # AI引擎的密码
"url": "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com:9002", # AI引擎的专有网络连接地址
"modelName": "bge_m3_model"
}
}
]
}
try:
response = self.client.ingest.put_pipeline(id=self.write_embedding_pipeline, body=pipeline)
print("Create pipeline success", response)
except Exception as e:
print("Create pipeline error ", e)
参数说明
参数 | 说明 | 是否必填 |
processors | 处理器,处理写入请求的配置。 | 是 |
text-embedding | 固定Key,用来标识当前Pipeline为文本写入自动Embedding。 | 是 |
inputFields | 搜索索引中,需要向量化的文本字段。该字段必须与后续创建向量索引时的 | 是 |
outputFields | 用于存储向量数据的向量字段。该字段必须与后续创建向量索引时的 | 是 |
userName | Lindorm AI引擎的默认用户名。 | 是 |
password | Lindorm AI引擎的默认初始密码。 | 是 |
url | AI引擎的连接地址,务必使用专有网络连接地址。 | 是 |
modeName | 模型名称,本文以 | 是 |
创建查询Pipeline
pipeline = {
"request_processors": [
{
"text-embedding": {
"tag": "auto-query-embedding",
"description": "Auto query embedding",
"model_config": {
"inputFields": ["text_field"],
"outputFields": ["text_field_embedding"],
"userName": "root", # AI引擎的用户名
"password": "test****", # AI引擎的密码
"url": "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com:9002", # AI引擎的专有网络连接地址
"modelName": "bge_m3_model"
}
}
}
]
}
try:
response = self.client.search_pipeline.put(id=self.knn_embedding_pipeline, body=pipeline)
print("Create pipeline success", response)
except Exception as e:
print("Create pipeline error ", e)
参数说明
参数 | 说明 | 是否必填 |
request_processors | 文本处理器,处理查询请求的配置。 | 是 |
text-embedding | 固定Key,用来标识当前Pipeline为文本查询自动Embedding。 | 是 |
tag | 处理器标识。可自定义取值,但需保持唯一性。 | 否 |
description | 处理器的说明。 | 否 |
model_config | 包含模型具体参数的字典。 | 是 |
inputFields | 与写入Pipeline的inputFields字段保持一致。该字段必须与后续创建向量索引时的 | 是 |
outputFields | inputfields中文本字段对应的向量字段。该字段必须与后续创建向量索引时的 | 是 |
userName | Lindorm AI引擎的默认用户名。 | 是 |
password | Lindorm AI引擎的默认初始密码。 | 是 |
url | AI引擎的连接地址,务必使用专有网络连接地址。 | 是 |
modeName | 模型名称,本文以 | 是 |
步骤三:创建向量索引并指定Pipeline
在创建向量索引或修改现有向量索引设置时,请指定所需的Pipeline,参数详情请参见创建向量索引。
创建向量索引
创建向量索引时的text_field
和text_field_embedding
,必须与写入和查询Pipeline中指定的inputFields
和outputFields
保持一致。
index_body = {
"settings": {
"index": {
"number_of_shards": 2,
"knn": True,
"default_pipeline": self.write_embedding_pipeline,
"search.default_pipeline": self.knn_embedding_pipeline
}
},
"mappings": {
"_source": {
"excludes": ["text_field_embedding"]
},
"properties": {
"text_field": {
"type": "text",
"analyzer": "ik_max_word"
},
"text_field_embedding": {
"type": "knn_vector",
"dimension": 1024,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "hnsw",
"space_type": "cosinesimil",
"parameters": {
"m": 24,
"ef_construction": 500
}
}
},
"tag": {
"type": "keyword"
},
"brand": {
"type": "keyword"
},
"merit": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
try:
response = self.client.indices.create(index=self.index, body=index_body)
print("Create index success", response)
except Exception as e:
print("Create index error ", e)
修改现有向量索引设置
如果您已经创建了向量索引,可以通过以下方式修改其配置,指定写入和查询时使用的Pipeline。
body = {
"index": {
"default_pipeline": self.write_embedding_pipeline,
"search.default_pipeline": self.knn_embedding_pipeline
}
}
print(self.client.indices.put_settings(index=self.index, body=body))
步骤四:写入数据
在写入数据时,系统会先将原始文本写入text_field
字段。然后,根据向量索引配置的写入Pipeline,系统会自动调用AI引擎中的BGE-M3模型,将text_field
编码为向量,并将生成的向量数据写入text_field_embedding
字段。
bulk_data = [
{
"index": {"_index": "search_vector_pipeline_test", "_id": "3982"},
"text_field": "品牌A 时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)",
"tag": ["鼠标", "电子产品"],
"brand": "品牌A",
"merit": "好用、外观漂亮"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "323519"},
"text_field": "品牌B 光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)",
"tag": ["鼠标", "电子产品"],
"brand": "品牌B",
"merit": "质量好、到货速度快、外观漂亮、好用"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "300265"},
"text_field": "品牌C 耳塞式耳机 白色(经典时尚)",
"tag": ["耳机", "电子产品"],
"brand": "品牌C",
"merit": "外观漂亮、质量好"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "6797"},
"text_field": "品牌D 两刀头充电式电动剃须刀",
"tag": ["家用电器", "电动剃须刀"],
"brand": "品牌D",
"merit": "好用、外观漂亮"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "8195"},
"text_field": "品牌E Class4 32G TF卡(micro SD)手机存储卡",
"tag": ["存储设备", "存储卡", "SD卡"],
"brand": "品牌E",
"merit": "容量挺大的、速度快、好用、质量好"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "13316"},
"text_field": "品牌E 101 G2 32GB 优盘",
"tag": ["存储设备", "U盘", "优盘"],
"brand": "品牌E",
"merit": "好用、容量挺大的、速度快"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "14103"},
"text_field": "品牌B 64GB至尊高速移动存储卡 UHS-1制式 读写速度最高可达30MB",
"tag": ["存储设备", "存储卡", "SD卡"],
"brand": "品牌B",
"merit": "容量挺大的、速度快、好用"
}
]
response = self.client.bulk(bulk_data, refresh=True)
print(response)
步骤五:查询数据
支持以下三种查询方式:
纯向量检索:通过计算向量相似度直接返回与查询向量匹配的结果。
向量+属性过滤检索:在向量相似度检索的基础上,结合元数据(如时间、类别、标签等)的条件过滤,精准筛选符合特定属性的相似结果。
向量+全文+属性过滤检索:综合向量相似度、全文匹配和属性条件过滤,实现多维度约束下的精准检索。
纯向量检索
query = {
"size": 10,
"_source": True,
"query": {
"knn": {
"text_field_embedding": {
"query_text": "存储卡",
"k": 10
}
}
},
"ext": {"lvector": {"ef_search": "200"}}
}
response = self.client.search(index=index_name, body=query)
print(response)
返回结果
{
"took": 64,
"timed_out": false,
"terminated_early": false,
"num_reduce_phases": 0,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 0.6915956,
"hits": [
{
"_index": "search_vector_pipeline_test",
"_id": "8195",
"_score": 0.6915956,
"_source": {
"index": {
"_index": "search_vector_pipeline_test",
"_id": "13316"
},
"text_field": "品牌E 101 G2 32GB 优盘",
"merit": "好用、容量挺大的、速度快",
"tag": ["存储设备", "U盘", "优盘"],
"brand": "品牌E"
}
},
{
"_index": "search_vector_pipeline_test",
"_id": "300265",
"_score": 0.6262013,
"_source": {
"index": {
"_index": "search_vector_pipeline_test",
"_id": "6797"
},
"text_field": "品牌D 两刀头充电式电动剃须刀",
"merit": "好用、外观漂亮",
"tag": ["家用电器", "电动剃须刀"],
"brand": "品牌D"
}
},
{
"_index": "search_vector_pipeline_test",
"_id": "3982",
"_score": 0.624792,
"_source": {
"index": {
"_index": "search_vector_pipeline_test",
"_id": "323519"
},
"text_field": "品牌B 光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)",
"merit": "质量好、到货速度快、外观漂亮、好用",
"tag": ["鼠标", "电子产品"],
"brand": "品牌B"
}
}
]
}
}
向量+属性过滤检索
query = {
"size": 10,
"_source": True,
"query": {
"knn": {
"text_field_embedding": {
"query_text": "存储卡",
"k": 10,
"filter": {
"bool": {
"filter": [{
"match": {
"merit": "质量好"
}
},
{
"term": {
"brand": "品牌E"
}
},
{
"terms": {
"tag": ["SD卡", "存储卡"]
}
}]
}
}
}
}
},
"ext": {
"lvector": {
"filter_type": "efficient_filter",
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "1",
"ef_search": "200"
}
}
}
response = self.client.search(index=index_name, body=query)
print(response)
返回结果
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": null,
"hits": []
}
}
向量+全文+属性过滤检索
query = {
"size": 10,
"_source": True,
"query": {
"knn": {
"text_field_embedding": {
"query_text": "存储卡",
"filter": {
"bool": {
"must": [{
"bool": {
"must": [{
"match": {
"text_field": {
"query": "存储卡"
}
}
}]
}
},
{
"bool": {
"filter": [{
"match": {
"merit": "质量好"
}
},
{
"term": {
"brand": "品牌E"
}
},
{
"terms": {
"tag": ["SD卡", "存储卡"]
}
}]
}
}]
}
},
"k": 10
}
}
},
"ext": {
"lvector": {
"filter_type": "efficient_filter",
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "1",
"ef_search": "200"
}
}
}
response = self.client.search(index=self.index, body=query)
print(response)
返回结果
{
"took": 5,
"timed_out": false,
"terminated_early": false,
"num_reduce_phases": 0,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": 0.0,
"hits": []
}
}
完整代码示例
LindormSearch.py
该模块是Lindorm搜索引擎的基础交互工具类,核心功能包括连接搜索引擎,索引操作及查询逻辑实现等。
import json
import random
from opensearchpy import OpenSearch
from ldconfig import Config
class LindormSearch:
def __init__(self):
self.random = random.Random(0)
self.top_k = 5
try:
self.client = OpenSearch(
hosts=[{"host": Config.SEARCH_HOST, "port": Config.SEARCH_PORT}],
http_auth=(Config.LD_USER, Config.LD_PASSWORD),
http_compress=False,
use_ssl=False,
)
except Exception as e:
print("Connection search error", e)
def get_index(self, index_name):
print(index_name, self.client.indices.get(index_name))
def bulk_insert(self, index_name, count=1000):
operations = []
for i in range(0, count):
id = self.random.randint(-2 ** 63, 2 ** 63 - 1)
operations.append(json.dumps({"index": {"_index": index_name, "_id": id}}))
operations.append("\n")
vector1 = []
for j in range(0, 3):
vector1.append(self.random.random())
operations.append(json.dumps({"field1": self.random.random(), "vector1": vector1}))
operations.append("\n")
response = self.client.bulk("".join(operations), refresh=True)
print(response)
def knn_search(self, index_name):
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"k": 10
}
}
},
"ext": {"lvector": {"min_score": "0.001"}}
}
response = self.client.search(index=index_name, body=query)
print(response)
def knn_search_with_filter(self, index_name):
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"filter": {
"range": {
"field1": {
"gte": 0
}
}
},
"k": 10
}
}
},
"ext": {"lvector": {"filter_type": "efficient_filter", "ef_search": "100", "k_expand_scope": "2000"}}
}
response = self.client.search(index=index_name, body=query)
print(response)
def text_search(self, text_query, k=int(Config.SEARCH_TOP_K)):
query_body = {
"size": k,
"_source": ["document_id", "chunking_position", "text"],
"query": {
"match": {
self.parent.text_field: text_query
}
}
}
res = self.client.search(index=self.index_name, body=query_body)
return res['hits']['hits']
def vector_search(self, text_query, k=int(Config.SEARCH_TOP_K)):
vector = self.parent.lindormAI.text_embedding(text_query)
query_body = {
"size": k,
"_source": ["document_id", "chunking_position", "text"],
"query": {
"knn": {
self.parent.vector_field: {
"vector": vector,
"k": k
}
}
},
"ext": {"lvector": {"ef_search": "200"}}
}
res = self.client.search(index=self.index_name, body=query_body)
return res['hits']['hits']
def rrf_search(self, text_query, k=int(Config.SEARCH_TOP_K)):
vector = self.parent.lindormAI.text_embedding(text_query)
query_body = {
"size": k,
"_source": ["document_id", "chunking_position", "text"],
"query": {
"knn": {
self.parent.vector_field: {
"vector": vector,
"filter": {
"match": {
self.parent.text_field: text_query,
}
},
"k": k
}
}
},
"ext": {"lvector": {
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "60",
"ef_search": "200"
}}
}
res = self.client.search(index=self.index_name, body=query_body)
return res['hits']['hits']
ldconfig.py
配置文件,存储全局参数,包括连接信息等参数。
class Config:
# 设置全局变量(需替换为您的实际信息)
LD_ES_HOST = "ld-t4n5668xk31ui****-proxy-search-vpc.lindorm.aliyuncs.com" # 搜索引擎的连接地址
LD_ES_PORT = 30070 # 搜索引擎的端口号
LD_USER = "root" # 搜索/AI引擎的默认用户名
LD_PASSWORD = "test****" # 搜索/AI引擎的默认初始密码
# AI引擎连接信息(需替换)
AI_HOST = "http://ld-t4n5668xk31ui****-proxy-ai-vpc.lindorm.aliyuncs.com" # AI引擎的连接地址
AI_PORT = 9002 # AI引擎的端口号
SEARCH_TOP_K = 10 # 根据实际情况设置合适的值
HnswIndexWithPipeline.py
该模块实现了完整的自动向量化流程,封装自步骤一到步骤五的所有操作。
import json
import requests
from LindormSearch import LindormSearch
from ldconfig import Config
class HnswIndexWithPipeline(LindormSearch):
def __init__(self):
super().__init__()
self.index = "search_vector_pipeline_test"
self.write_embedding_pipeline = "write_embedding_pipeline"
self.knn_embedding_pipeline = "knn_embedding_pipeline"
def create_ai_model(self):
url = "http://{}:{}/v1/ai/models/create".format(Config.AI_HOST, int(Config.AI_PORT))
headers = {
"Content-Type": "application/json; charset=utf-8",
"x-ld-ak": Config.LD_USER,
"x-ld-sk": Config.LD_PASSWORD
}
data = {
"model_name": "bge_m3_model",
"model_path": "huggingface://BAAI/bge-m3",
"task": "FEATURE_EXTRACTION",
"algorithm": "BGE_M3",
"settings": {"instance_count": "2"}
}
response = requests.post(url, data=json.dumps(data), headers=headers)
json_response = response.json()
if response.status_code != 200 or json_response["success"] is False:
print("http request failed, status code: {}".format(json_response["msg"]))
def check_pipeline_exists(self) -> bool:
try:
print(self.client.ingest.get_pipeline(id=self.write_embedding_pipeline))
return True
except Exception as e:
return False
def create_write_embedding_pipeline(self):
if self.check_pipeline_exists():
print("Pipeline {} exists".format(self.write_embedding_pipeline))
return
inner_ai_host = Config.AI_HOST
if "-pub" in inner_ai_host:
inner_ai_host = inner_ai_host.replace("-pub", "-vpc")
pipeline = {
"description": "demo_chunking pipeline",
"processors": [
{
"text-embedding": {
"inputFields": ["text_field"],
"outputFields": ["text_field_embedding"],
"userName": Config.LD_USER,
"password": Config.LD_PASSWORD,
"url": "http://{}:{}".format(inner_ai_host, int(Config.AI_PORT)),
"modeName": "bge_m3_model"
}
}
]
}
try:
response = self.client.ingest.put_pipeline(id=self.write_embedding_pipeline, body=pipeline)
print("Create pipeline success", response)
except Exception as e:
print("Create pipeline errr ", e)
def check_knn_pipeline_exists(self) -> bool:
try:
response = self.client.search_pipeline.get(id=self.knn_embedding_pipeline)
print(response)
return True
except Exception as e:
return False
def create_knnsearch_pipeline(self):
if self.check_knn_pipeline_exists():
print("Pipeline {} exists".format(self.knn_embedding_pipeline))
# 如果pipeline已经存在,目前策略是跳过,如果是需要调整参数重新创建,则注释掉下方的return
return
inner_ai_host = Config.AI_HOST
if "-pub" in inner_ai_host:
inner_ai_host = inner_ai_host.replace("-pub", "-vpc")
pipeline = {
"request_processors": [
{
"text-embedding": {
"tag": "auto-query-embedding",
"description": "Auto query embedding",
"model_config": {
"inputFields": ["text_field"],
"outputFields": ["text_field_embedding"],
"userName": Config.LD_USER,
"password": Config.LD_PASSWORD,
"url": "http://{}:{}".format(inner_ai_host, int(Config.AI_PORT)),
"modeName": "bge_m3_model"
}
}
}
]
}
try:
response = self.client.search_pipeline.put(id=self.knn_embedding_pipeline, body=pipeline)
print("Create pipeline success", response)
except Exception as e:
print("Create pipeline errr ", e)
def create_index_with_pipeline(self):
index_body = {
"settings": {
"index": {
"number_of_shards": 2,
"knn": True,
"default_pipeline": self.write_embedding_pipeline,
"search.default_pipeline": self.knn_embedding_pipeline
}
},
"mappings": {
"_source": {
"excludes": ["text_field_embedding"]
},
"properties": {
"text_field": {
"type": "text",
"analyzer": "ik_max_word"
},
"text_field_embedding": {
"type": "knn_vector",
"dimension": 1024,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "hnsw",
"space_type": "cosinesimil",
"parameters": {
"m": 24,
"ef_construction": 500
}
}
},
"tag": {
"type": "keyword"
},
"brand": {
"type": "keyword"
},
"merit": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
try:
response = self.client.indices.create(index=self.index, body=index_body)
print("Create index success", response)
except Exception as e:
print("Create index errr ", e)
def update_index_setting(self):
body = {
"index": {
"default_pipeline": self.write_embedding_pipeline,
"search.default_pipeline": self.knn_embedding_pipeline
}
}
print(self.client.indices.put_settings(index=self.index, body=body))
def bulk_insert_docs(self):
bulk_data = [
{
"index": {"_index": "search_vector_pipeline_test", "_id": "3982"},
"text_field": "品牌A 时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)",
"tag": ["鼠标", "电子产品"],
"brand": "品牌A",
"merit": "好用、外观漂亮"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "323519"},
"text_field": "品牌B 1090光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)",
"tag": ["鼠标", "电子产品"],
"brand": "品牌B",
"merit": "质量好、到货速度快、外观漂亮、好用"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "300265"},
"text_field": "品牌C 耳塞式耳机 白色(经典时尚)",
"tag": ["耳机", "电子产品"],
"brand": "品牌C",
"merit": "外观漂亮、质量好"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "6797"},
"text_field": "品牌D 两刀头充电式电动剃须刀",
"tag": ["家用电器", "电动剃须刀"],
"brand": "品牌D",
"merit": "好用、外观漂亮"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "8195"},
"text_field": "品牌E Class4 32G TF卡(micro SD)手机存储卡",
"tag": ["存储设备", "存储卡", "SD卡"],
"brand": "品牌E",
"merit": "容量挺大的、速度快、好用、质量好"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "13316"},
"text_field": "品牌E 101 G2 32GB 优盘",
"tag": ["存储设备", "U盘", "优盘"],
"brand": "品牌E",
"merit": "好用、容量挺大的、速度快"
},
{
"index": {"_index": "search_vector_pipeline_test", "_id": "14103"},
"text_field": "品牌B 64GB至尊高速移动存储卡 UHS-1制式 读写速度最高可达30MB",
"tag": ["存储设备", "存储卡", "SD卡"],
"brand": "品牌B",
"merit": "容量挺大的、速度快、好用"
}
]
response = self.client.bulk(bulk_data, refresh=True)
print(response)
def knn_search(self, index_name):
query = {
"size": 10,
"_source": True,
"query": {
"knn": {
"text_field_embedding": {
"query_text": "存储卡",
"k": 10
}
}
},
"ext": {"lvector": {"ef_search": "200"}}
}
response = self.client.search(index=index_name, body=query)
print(response)
def knn_search_with_filter(self, index_name):
query = {
"size": 10,
"_source": True,
"query": {
"knn": {
"text_field_embedding": {
"query_text": "存储卡",
"k": 10,
"filter": {
"bool": {
"filter": [{
"match": {
"merit": "质量好"
}
},
{
"term": {
"brand": "品牌E"
}
},
{
"terms": {
"tag": ["SD卡", "存储卡"]
}
}]
}
}
}
}
},
"ext": {
"lvector": {
"filter_type": "efficient_filter",
"ef_search": "200"
}
}
}
response = self.client.search(index=index_name, body=query)
print(response)
def rrf_search_with_filter(self):
query = {
"size": 10,
"_source": True,
"query": {
"knn": {
"text_field_embedding": {
"query_text": "存储卡",
"filter": {
"bool": {
"must": [{
"bool": {
"must": [{
"match": {
"text_field": {
"query": "存储卡"
}
}
}]
}
},
{
"bool": {
"filter": [{
"match": {
"merit": "质量好"
}
},
{
"term": {
"brand": "品牌E"
}
},
{
"terms": {
"tag": ["SD卡", "存储卡"]
}
}]
}
}]
}
},
"k": 10
}
}
},
"ext": {
"lvector": {
"filter_type": "efficient_filter",
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "1",
"ef_search": "200"
}
}
}
response = self.client.search(index=self.index, body=query)
print(response)
if __name__ == '__main__':
hnsw = HnswIndexWithPipeline()
hnsw.create_ai_model()
hnsw.create_write_embedding_pipeline()
hnsw.create_knnsearch_pipeline()
hnsw.create_index_with_pipeline()
hnsw.update_index_setting()
hnsw.get_index(hnsw.index)
hnsw.bulk_insert_docs()
hnsw.knn_search(hnsw.index)
hnsw.knn_search_with_filter(hnsw.index)
hnsw.rrf_search_with_filter()