Lindorm向量引擎支持向量数据检索功能,兼容Elasticsearch协议,同时支持标量、向量、全文混合检索功能。本文介绍基于Python语言,通过opensearch-py客户端连接和使用向量引擎的方法。
前提条件
准备工作
在创建和使用向量索引前,您需要通过opensearch-py连接搜索引擎,连接方式如下:
from opensearchpy import OpenSearch, Object
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 如果使用 logging,为防止 opensearch info 日志过多,需要进行以下修改
logging.getLogger('opensearch').setLevel(logging.WARN)
def get_client() -> OpenSearch:
search_client = OpenSearch(
hosts=[{"host": "ld-t4n5668xk31ui****.lindorm.aliyuncs.com", "port": 30070}],
http_auth=("<username>", "<password>"),
http_compress=False,
use_ssl=False,
)
return search_client
其中host、username和password分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看连接信息。
算法说明
您可以参考各个算法的特点、适用的数据量等,创建适合业务场景的向量索引。
算法 | 数据量 | 资源占用 | 算法特点 | 注意事项 |
flat | [0,1万) | 纯内存。 | 暴力检索,使用简单,适合小数据集。 | 检索性能会随着数据量增加而降低。 |
hnsw | [1万, 100万) | 纯内存,资源占用较高。 | 在线索引,适合数据集中等规模,用法相对简单。 | CPU占用量与number_of_shards(索引分片数量)的值相关,其值通常设置为向量引擎节点数。如果索引数量较多,且每个索引数据量较少,将number_of_shards设置为 |
ivfpq | 100万以上 | 磁盘索引,默认1:8压缩,即内存大小为原始数据的1/8。 | 离线索引,需要先写入一定量的数据再构建索引,适合大数据集,成本较低。 | 发起索引构建时需确保写入的数据量充足,必须大于256条且超过nlist的30倍。 建议在离线数据导入完成后再触发索引构建。索引构建完成后,您可以正常进行KNN查询和写入操作。 说明 nlist参数的说明,请参见参数说明。 |
创建向量索引
创建向量索引,其中vector1
为向量列、field1
为普通列。向量列及其相关参数必须在创建索引时通过mappings结构显式指定。
hnsw类型索引
以创建索引vector_test
为例:
index_body = {
"settings": {
"index": {
"number_of_shards": 2,
"knn": True
}
},
"mappings": {
"_source": {"excludes": ["vector1"]},
"properties": {
"vector1": {
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"m": 24,
"ef_construction": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}
response = client.indices.create(index='vector_test', body=index_body)
ivfpq类型索引
以创建索引vector_ivfpq_test
为例:
index_body = {
"settings": {
"index": {
"number_of_shards": 4,
"knn": True,
"knn.offline.construction": True
}
},
"mappings": {
"_source": {"excludes": ["vector1"]},
"properties": {
"vector1": {
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "ivfpq",
"space_type": "cosinesimil",
"parameters": {
"m": 3,
"nlist": 10000,
"centroids_use_hnsw": True,
"centroids_hnsw_m": 48,
"centroids_hnsw_ef_construct": 500,
"centroids_hnsw_ef_search": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}
response = client.indices.create(index='vector_ivfpq_test', body=index_body)
创建ivfpq类型索引,您必须注意以下事项:
ivfpq中knn.offline.construction务必设置为
true
,后续需要写入一定量的数据才能发起构建索引。使用ivfpq算法请将dimension替换业务真实向量维度,并将m值设置为与dimension相同的值。
稀疏向量索引
以创建索引vector_sparse_test
为例:
index_body = {
"settings": {
"index": {
"number_of_shards": 2,
"knn": True
}
},
"mappings": {
"_source": {"excludes": ["vector1"]},
"properties": {
"vector1": {
"type": "knn_vector",
"data_type": "sparse_vector",
"method": {
"engine": "lvector",
"name": "sparse_hnsw",
"space_type": "innerproduct",
"parameters": {
"m": 24,
"ef_construction": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}
response = client.indices.create(index='vector_sparse_test', body=index_body)
创建稀疏向量时,data_type参数必须填写sparse_vector
,name参数必须填写sparse_hnsw
,space_type参数必须填写innerproduct
。参数的详细说明,请参见参数说明。
参数说明
通用参数
参数 | 是否必填 | 说明 |
type | 是 | 索引列的类型。对于向量列,固定为 |
dimension | 是 | 向量数据的维度。取值范围:[1,32768]。 |
data_type | 否 | 向量数据的类型。目前支持以下类型:
|
method.name | 是 | 向量数据的索引算法。取值如下:
|
method.space_type | 否 | 向量数据的距离算法。取值如下:
|
HNSW算法参数
参数 | 是否必填 | 说明 |
method.parameters.m | 否 | 每一层图的最大出边数量。 取值范围:[1,100]。默认值为 |
method.parameters.ef_construction | 否 | 索引构建时动态列表的长度。 取值范围:[1,1000]。默认值为 |
IVFPQ算法参数
参数 | 是否必填 | 说明 |
method.parameters.m | 否 | 量化中子空间的数量。取值范围:[2,32768]。默认值为 重要 创建ivfpq类型索引时,该参数值必须与通用参数dimension的值相同。 |
method.parameters.nlist | 否 | 聚类中心的数量。 取值范围:[2, 1000000]。默认值为 |
method.parameters.centroids_use_hnsw | 否 | 是否在聚类中心搜索时使用HNSW算法。 取值如下:
|
method.parameters.centroids_hnsw_m | 否 | 若在聚类中心搜索时使用HNSW算法,设定HNSW算法的每一层图的最大出边数量。 取值范围:[1,100]。默认值为 |
method.parameters.centroids_hnsw_ef_construct | 否 | 若在聚类中心搜索时使用HNSW算法,设定HNSW算法在索引构建时动态列表的长度。 取值范围:[1,1000]。默认值为 |
method.parameters.centroids_hnsw_ef_search | 否 | 若在聚类中心搜索时使用HNSW算法,设定HNSW算法在查询时动态列表的长度。 取值范围:[1,1000]。默认值为 |
数据写入
包含向量列的索引的数据写入方式与普通索引的数据写入方式一致。
单条写入
您可以根据业务需要选择插入写或更新写。
插入写
使用index写入,若目标数据已存在,则不允许写入。以写入索引vector_test
为例:
doc = {
"field1": 1,
"vector1": [1.2, 1.3, 1.4]
}
response = client.index(index='vector_test', body=doc, id=1)
更新写
使用create写入,若目标数据已存在,则执行更新操作;若目标数据不存在,写入数据。以写入索引vector_test
为例:
doc = {
"field1": 2,
"vector1": [2.2, 2.3, 2.4]
}
response = client.create(index='vector_test', body=doc, id=2)
发起单条写入请求时,需要提供索引的名称、ID和请求体(body)。id
代表本条数据在索引中的唯一标识,可填写也可以不填写,不填写时系统将自动生成。请求体代表一条数据的信息,其中每个属性代表该条数据一个字段的值,属性Key为字段名称,属性value为字段的值,例如在上述示例中field1
为字段名称,2
为该字段的值。其中向量字段的值以数组的形式提供。
请求发起后,服务端响应将返回如下结果:
{
"_index" : "vector_test",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1
}
返回结果集说明
字段 | 说明 |
_index | 写入的索引名称。 |
_id | 本条数据的ID。 |
_version | 本条数据的版本号。多次更新同一条数据时,版本号会发生改变。 |
result | 本次操作的结果。 |
_shards.total | 参与操作的总分片数。 |
_shards.successful | 成功的分片数。 |
_shards.failed | 失败的分片数。 |
批量写入
通用写入
以写入索引vector_test
为例:
operations = """
{ "index" : { "_index" : "vector_test", "_id" : "2" } }
{ "field1" : 1, "vector1": [2.2, 2.3, 2.4]}
{ "create" : { "_index" : "vector_test", "_id" : "3" } }
{ "field1" : 2, "vector1": [1.2, 1.3, 4.4]}
{ "delete" : { "_index" : "vector_test", "_id" : "2" } }
{ "update" : {"_id" : "1", "_index" : "vector_test"} }
{ "doc" : {"field1" : 3, "vector1": [2.2, 3.3, 4.4]} }
"""
response = client.bulk(operations)
# 批量数据写入封装
def write_docs(self, index_name: str) -> None:
operations = []
for i in range(0, 1000):
id = self.random.randint(-2 ** 63, 2 ** 63 - 1)
operations.append(json.dumps({"index": {"_index": index_name, "_id": id}}))
operations.append("\n")
vector1 = []
for j in range(0, 5):
vector1.append(self.random.random())
operations.append(json.dumps({"field1": self.random.random(), "vector1": vector1}))
operations.append("\n")
response = self.client.bulk("".join(operations))
稀疏向量写入
写入方式与上述方式相同,但需要修改vector1的格式。共支持两种格式:JSON STRING和JSON Object。前者性能更优,后者格式更友好。
JSON STRING
以写入索引vector_sparse_test
为例:
doc = {
"field1": 2,
"vector1": "{\"indices\": [10, 14, 16], \"values\": [0.5, 0.5, 0.2]}"
}
response = client.index(index='vector_sparse_test', body=doc, id=1)
JSON Object
以写入索引vector_sparse_test
为例:
doc = {
"field1": 1,
"vector1": {"indices": [10, 12, 16], "values": [0.5, 0.5, 0.2]}
}
response = client.index(index='vector_sparse_test', body=doc, id=1)
索引构建
除ivfpq索引,其他类型索引创建时index.knn.offline.construction默认为
false
,即在线索引,无需手动构建。在触发ivfpq索引构建前需注意:在创建ivfpq索引时,需将index.knn.offline.construction显式指定为
true
,且在发起构建时务必确保已写入足够的数据量,必须大于256条且超过nlist的30倍。手动触发索引构建完成后,后续可正常写入和查询,无需再次构建索引。
触发构建
构建索引, 可以使用self.search_client.transport.perform_request
封装post请求。以构建索引vector_ivfpq_test
为例:
body_build = {
"indexName": "vector_ivfpq_test",
"fieldName": "vector1",
"removeOldIndex": "true"
}
build_response = client.transport.perform_request(
method="POST",
url='/_plugins/_vector/index/build',
body=body_build
)
参数说明
参数 | 是否必填 | 说明 |
indexName | 是 | 表名称,例如 |
fieldName | 是 | 针对哪个字段构建索引,例如 |
removeOldIndex | 是 | 构建索引时,是否删除旧的索引。取值如下:
|
返回结果如下:
{
"payload": ["default_vector_ivfpq_test_vector1"]
}
返回结果为索引构建生成的taskId
。
查看索引状态
您可以通过以下方式查看索引构建状态。以查看索引vector_ivfpq_test
为例:
build_query = {
"indexName": "vector_ivfpq_test",
"fieldName": "vector1",
"taskIds": "[\"default_vector_ivfpq_test_vector1\"]"
}
response = client.transport.perform_request(
method='GET',
url='/_plugins/_vector/index/tasks',
body=build_query
)
其中,taskIds为触发构建时生成的taskId
,可以填写空的数组,例如"taskIds": "[]"
,效果与上述已填写taskIds的效果一致。
返回结果如下:
{
"payload": ["task: default_vector_ivfpq_test_vector1, stage: FINISH, innerTasks: xxx, info: finish building"]
}
其中,stage表示构建状态,共包含以下几种状态:START(开始构建)、TRAIN(训练阶段)、BUILDING(构建中)、ABORT(终止构建)、FINISH(构建完成)和FAIL(构建失败)。
ABORT通常调用/index/abort接口来终止索引构建。
终止构建
终止索引的构建流程。状态为FINISH
的索引不支持调用该方法。以终止索引vector_ivfpq_test
的构建为例:
abort_body = {
"indexName": "vector_ivfpq_test",
"fieldName": "vector1",
"taskIds": "[\"default_vector_ivfpq_test_vector1\"]"
}
response = client.transport.perform_request(
method='POST',
url='/_plugins/_vector/index/tasks/abort',
body=abort_body
)
返回结果如下:
{
"payload":["Task: default_vector_ivfpq_test_vector1 remove success"]
}
数据查询
纯向量数据查询
纯向量数据的查询可以通过knn
结构实现。
通用查询方式
以下示例查询索引vector_test
的向量列vector1
字段中与向量[2.3, 3.3, 4.4]
相关的前10条数据,并要求最小得分为0.1
。
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"k": 10
}
}
},
"ext": {"lvector": {"min_score": "0.1"}}
}
response = client.search(index='vector_test', body=query)
参数说明
参数结构 | 参数 | 是否必填 | 说明 |
knn | vector | 是 | 查询时使用的向量。 |
k | 是 | 返回最相似的K个数据。 重要 在纯向量检索场景中,建议将size和k设置为相同的值。 | |
ext | lvector.min_score | 否 | 相似度阈值,要求返回的向量得分大于该值。返回的向量得分范围为[0,1]。 取值范围:[0,+inf]。默认值为 |
lvector.filter_type | 否 | 融合查询使用的模式。取值如下:
默认值为空。 | |
lvector.ef_search | 否 | HNSW算法中,索引构建时动态列表的长度。只能用于HNSW算法。 取值范围:[1,1000]。默认值为 | |
lvector.nprobe | 否 | 要查询的聚类单元(cluster units)的数量。请根据您的召回率要求,对该参数的值进行调整已达到理想效果。值越大,召回率越高,搜索性能越低。 取值范围:[1,method.parameters.nlist]。无默认值。 重要 仅适用于ivfpq算法。 | |
lvector.reorder_factor | 否 | 使用原始向量创建重排序(reorder)。ivfpq算法计算的距离为量化后的距离,会有一定的精度损失,需要使用原始向量进行重排序。比例为 取值范围:[1,200]。默认值为 重要
|
返回结果如下:
返回指定字段
如果需要在查询时返回指定字段,可以指定 "_source": ["field1", "field2"]
或使用"_source": true
返回非向量的全部字段。以查询索引vector_test
为例,使用方法如下:
query = {
"size": 10,
"_source": ["field1"],
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"k": 10
}
}
},
"ext": {"lvector": {"min_score": "0.1"}}
}
response = client.search(index='vector_test', body=query)
返回结果如下:
hsnw算法查询
以查询hsnw索引vector_test
为例:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"k": 10
}
}
},
"ext": {"lvector": {"ef_search": "100"}}
}
response = client.search(index='vector_test', body=query)
ivfpq算法查询
以查询ivfpq索引vector_ivfpq_test
为例:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"k": 10
}
}
},
"ext": {"lvector": {"nprobe": "60", "reorder_factor": "5"}}
}
response = client.search(index='vector_ivfpq_test', body=query)
如果k值相对较大,如大于100,将reorder_factor的值设置为
1
即可。当nlist的值为
10000
时,可以先将nprobe设置为60
,查看检索效果。如果想继续提升召回率,可适当增加nprobe的值,如80、100、120、140、160,该值引起的性能损耗远小于reorder_factor,但也不适宜设置过大。
稀疏向量查询
查询方式与上述方式相同,但需要修改vector1的格式。共支持两种格式:JSON STRING和JSON Object。前者性能更优,后者格式更友好。
JSON STRING
以查询稀疏向量索引vector_sparse_test
为例:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": "{\"indices\": [10, 45, 16], \"values\": [0.5, 0.5, 0.2]}",
"k": 10
}
}
}
}
response = client.search(index='vector_sparse_test', body=query)
JSON Object
以查询稀疏向量索引vector_sparse_test
为例:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": {"indices": [10, 45, 16], "values": [0.5, 0.5, 0.2]},
"k": 10
}
}
}
}
response = client.search(index='vector_sparse_test', body=query)
融合查询
向量列的查询可与普通列的查询条件结合,并返回综合的查询结果。在实际业务使用时, Post_Filter近似查询通常能获取更相似的检索结果。
Pre-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为pre_filter
,可实现先过滤结构化数据,再查询向量数据。
目前结构化过滤数据的上限为10,000条。
以查询索引vector_test
为例:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"filter": {
"range": {
"field1": {
"gte": 0
}
}
},
"k": 10
}
}
},
"ext": {"lvector": {"filter_type": "pre_filter"}}
}
response = client.search(index='vector_test', body=query)
Post-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为post_filter
,可实现先查询向量数据,再过滤结构化数据。
在使用Post_Filter近似查询时,可以适当将k的值设置大一些,以便获取更多的向量数据再进行过滤。
以查询索引vector_test
为例:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"filter": {
"range": {
"field1": {
"gte": 0
}
}
},
"k": 10
}
}
},
"ext": {"lvector": {"filter_type": "post_filter"}}
}
response = client.search(index='vector_test', body=query)
在使用Post_Filter近似查询时需要适当放大k的值,如果使用ivfpq算法,还需要调整reorder_factor的值。以查询索引vector_test
为例,具体使用如下:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"filter": {
"range": {
"field1": {
"gte": 0
}
}
},
"k": 1000
}
}
},
"ext": {"lvector": {"filter_type": "post_filter","nprobe": "60", "reorder_factor": "1"}}
}
response = client.search(index='vector_test', body=query)
在Post_Filter近似查询场景中,可以将k值放大至10,000、最大控制在20,000之内,从而将处理时延控制在百毫秒之内。如果k值相对较大,将reorder_factor的值设置为
1
即可。当nlist的值为
10000
时,可以先将nprobe设置为60,查看检索效果。如果检索效果不理想,可适当增加nprobe的值,如80、100、120、140、160,该值引起的性能损耗远小于reorder_factor,但也不宜设置过大。
您也可以通过post_filter添加过滤条件,实现Post-Filter近似查询。以查询索引vector_test
为例,具体使用如下:
query = {
"size": 10,
"query": {
"knn": {
"vector1": {
"vector": [2.3, 3.3, 4.4],
"k": 10
}
}
},
"post_filter": {
"range": {
"field1": {
"gte": 0
}
}
}
}
response = client.search(index='vector_test', body=query)
常规用法
以下以hnsw索引vector_test
为例,介绍索引基础的查询、删除等常规使用方法。
查询所有索引及其数据量。
info = client.cat.indices(v=True)
返回结果如下:
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open vector_test vector_test 2 0 2 0 6.8kb 6.8kb
查询指定索引的数据量。以查询
vector_test
为例。client.count(index="vector_test", pretty=True)
返回结果如下:
{ "count" : 2, "_shards" : { "total" : 2, "successful" : 2, "skipped" : 0, "failed" : 0 } }
查看索引创建信息。
client.indices.get(index="vector_test", pretty=True)
返回结果如下:
删除整个索引。
client.indices.delete(index="vector_test")
通过查询删除符合查询条件的指定数据。
delete_query = { "query": { "term": { "field1": 1 # 删除 field1 为 1 的文档 } } } # 执行 delete_by_query response = client.delete_by_query( index='vector_test', body=delete_query )