Lindorm向量引擎支持向量数据检索功能,兼容Elasticsearch协议,同时支持标量、向量、全文混合检索功能。本文介绍基于Go语言,通过OpenSearch Go客户端连接和使用向量引擎的方法。
前提条件
准备工作
安装OpenSearch Go客户端
您需要安装OpenSearch Go客户端,支持以下两种方式:
方式一:
修改
go.mod
文件配置,添加相关依赖,具体如下:module SearchDemo go 1.23.0 //请替换为您的Go版本 require ( github.com/opensearch-project/opensearch-go/v2 v2.3.0 github.com/google/uuid v1.6.0 // 非必须,示例代码中生成uuid需要该项,请根据实际情况配置 )
执行以下命令,更新
go.mod
文件。go mod tidy
方式二:执行以下命令直接下载。
go get github.com/opensearch-project/opensearch-go/v2@v2.3.0
连接搜索引擎
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"math/rand"
"net/http"
"strings"
"time"
)
var client *opensearch.Client
func init() {
// 替换为搜索引擎域名
cfg := opensearch.Config{
Addresses: []string{
"http://ld-t4n5668xk31ui****-proxy-search-vpc.lindorm.aliyuncs.com:30070",
},
Username: "<Username>",
Password: "<Password>",
}
var err error
client, err = opensearch.NewClient(cfg)
if err != nil {
fmt.Println("Init client error %s", err)
}
}
// 定义错误处理函数
func handlerCommonError(res *opensearchapi.Response) {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
fmt.Printf("Error parsing the error response: %s\n", err)
return
}
fmt.Printf("ERROR %v\n", errorResponse)
}
// 定义knnSearch返回的结构体
type Source struct {
Field1 int `json:"field1"`
}
type KnnResponse struct {
Hits struct {
Hits []struct {
ID string `json:"_id"`
Score float64 `json:"_score"`
Source Source `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
其中Addresses、Username和Password分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看搜索引擎连接地址。
创建向量索引
创建向量索引,其中vector1
为向量列、field1
为普通列。向量列及其相关参数必须在创建索引时通过mappings结构显式指定。
hnsw类型索引
以创建索引vector_test
为例:
func createHnswIndex() {
indexName := "vector_test"
vectorColumn := "vector1"
indexBody := fmt.Sprintf(`
{
"settings": {
"index": {
"number_of_shards": 2,
"knn": true
}
},
"mappings": {
"_source": {
"excludes": ["%s"]
},
"properties": {
"%s": {
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"m": 24,
"ef_construction": 500
}
}
},
"field1": {
"type": "long"
}
}
}
}`, vectorColumn, vectorColumn)
content := strings.NewReader(indexBody)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: content,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Create index error: ", err)
return
}
defer res.Body.Close()
if res.IsError() {
handlerCommonError(res)
return
}
fmt.Println("Create response: ", res.String())
}
ivfpq类型索引
以创建索引vector_ivfpq_test
为例:
func createIvfPQIndex() {
indexName := "vector_ivfpq_test"
vectorColumn := "vector1"
// parameters中的m与dimension一致
dim := 3
// 下方的nlist参数,少量数据集可以用1000,正式务必用10000
indexBody := fmt.Sprintf(`
{
"settings": {
"index": {
"number_of_shards": 4,
"knn": true,
"knn.offline.construction": true
}
},
"mappings": {
"_source": {
"excludes": ["%s"]
},
"properties": {
"%s": {
"type": "knn_vector",
"dimension": %d,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "ivfpq",
"space_type": "cosinesimil",
"parameters": {
"m": %d,
"nlist": 10000,
"centroids_use_hnsw": true,
"centroids_hnsw_m": 48,
"centroids_hnsw_ef_construct": 500,
"centroids_hnsw_ef_search": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}`, vectorColumn, vectorColumn, dim, dim)
content := strings.NewReader(indexBody)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 创建索引请求
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: content,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Create index error ", err)
return
}
defer res.Body.Close()
fmt.Println("Create response %s", res.String())
}
创建ivfpq类型索引,您必须注意以下事项:
ivfpq中knn.offline.construction务必设置为
true
,后续需要写入一定量的数据才能发起构建索引。使用ivfpq算法请将dimension替换业务真实向量维度,并将m值设置为与dimension相同的值。
稀疏向量索引
以创建索引vector_sparse_test
为例:
func createSparseVectorIndex() {
indexName := "vector_sparse_test"
vectorColumn := "vector1"
indexBody := fmt.Sprintf(`
{
"settings": {
"index": {
"number_of_shards": 2,
"knn": true
}
},
"mappings": {
"_source": {
"excludes": ["%s"]
},
"properties": {
"%s": {
"type": "knn_vector",
"data_type": "sparse_vector",
"method": {
"engine": "lvector",
"name": "sparse_hnsw",
"space_type": "innerproduct",
"parameters": {
"m": 24,
"ef_construction": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}`, vectorColumn, vectorColumn)
content := strings.NewReader(indexBody)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: content,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Create index error: ", err)
return
}
defer res.Body.Close()
if res.IsError() {
handlerCommonError(res)
return
}
fmt.Println("Create response: ", res.String())
}
创建稀疏向量时,data_type参数必须填写sparse_vector
,name参数必须填写sparse_hnsw
,space_type参数必须填写innerproduct
。
参数说明
通用参数
参数 | 是否必填 | 说明 |
type | 是 | 索引列的类型。对于向量列,固定为 |
dimension | 是 | 向量数据的维度。取值范围:[1,32768]。 |
data_type | 否 | 向量数据的类型。目前支持以下类型:
|
method.name | 是 | 向量数据的索引算法。取值如下:
|
method.space_type | 否 | 向量数据的距离算法。取值如下:
|
HNSW算法参数
参数 | 是否必填 | 说明 |
method.parameters.m | 否 | 每一层图的最大出边数量。 取值范围:[1,100]。默认值为 |
method.parameters.ef_construction | 否 | 索引构建时动态列表的长度。 取值范围:[1,1000]。默认值为 |
IVFPQ算法参数
参数 | 是否必填 | 说明 |
method.parameters.m | 否 | 量化中子空间的数量。取值范围:[2,32768]。默认值为 重要 创建ivfpq类型索引时,该参数值必须与通用参数dimension的值相同。 |
method.parameters.nlist | 否 | 聚类中心的数量。 取值范围:[2, 1000000]。默认值为 |
method.parameters.centroids_use_hnsw | 否 | 是否在聚类中心搜索时使用HNSW算法。 取值如下:
|
method.parameters.centroids_hnsw_m | 否 | 若在聚类中心搜索时使用HNSW算法,设定HNSW算法的每一层图的最大出边数量。 取值范围:[1,100]。默认值为 |
method.parameters.centroids_hnsw_ef_construct | 否 | 若在聚类中心搜索时使用HNSW算法,设定HNSW算法在索引构建时动态列表的长度。 取值范围:[1,1000]。默认值为 |
method.parameters.centroids_hnsw_ef_search | 否 | 若在聚类中心搜索时使用HNSW算法,设定HNSW算法在查询时动态列表的长度。 取值范围:[1,1000]。默认值为 |
数据写入
包含向量列的索引的数据写入方式与普通索引的数据写入方式一致。
单条写入
您可以根据业务需要选择插入写或更新写。
插入写
若目标数据已存在,则不允许写入。以写入索引vector_test
为例:
func writeDataCreate() {
// 插入写
doc := strings.NewReader(`{
"field1": 2,
"vector1": [2.2, 2.3, 2.4]
}`)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.CreateRequest{
Index: "vector_test",
DocumentID: "2",
Body: doc,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Write index error ", err)
}
defer res.Body.Close()
}
更新写
以写入索引vector_test
为例:
func writeDataIndex() {
// 更新写
doc := strings.NewReader(`{
"field1": 1,
"vector1": [1.2, 1.3, 1.4]
}`)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.IndexRequest{
Index: "vector_test",
DocumentID: "1",
Body: doc,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Write index error ", err)
}
defer res.Body.Close()
}
批量写入
通用写入
_bulk写入方式为指定JSON写入。以写入索引vector_test
为例,具体如下:
// 人为指定json串
func writeDataBulk() {
// bulk 写入
doc := strings.NewReader(`
{ "index" : { "_index" : "vector_test", "_id" : "3" } }
{ "field1" : 3, "vector1": [3.2, 3.3, 3.4] }
{ "create" : { "_index" : "vector_test", "_id" : "4" } }
{ "field1" : 4, "vector1": [4.2, 4.3, 4.4] }
{ "delete" : { "_index" : "vector_test", "_id" : "2" } }
{ "update" : {"_id" : "1", "_index" : "vector_test"} }
{ "doc" : {"field1" : 3, "vector1": [2.21, 3.31, 4.41]} }
`)
// 确保每行后都有换行符
var buffer bytes.Buffer
scanner := bufio.NewScanner(doc)
for scanner.Scan() {
buffer.WriteString(scanner.Text())
buffer.WriteString("\n")
}
res, err := client.Bulk(&buffer)
fmt.Println(res.String())
if err != nil {
fmt.Println("Bulk write error ", err)
}
defer res.Body.Close()
}
您也可以定义数据写入逻辑,通过_bulk一次性写入大量数据。以写入索引vector_ivfpq_test
为例:
// 代码样例是随机生成dim维度的向量,实际业务替换
// 生成格式 [0.7862035,0.9371029,0.50112325]
func randomDenseVector(dim int) string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
randomArray := make([]float32, dim)
for i := range randomArray {
randomArray[i] = random.Float32()
}
var formattedArray []string
for _, num := range randomArray {
formattedArray = append(formattedArray, fmt.Sprintf("%.8f", num))
}
result := "[" + strings.Join(formattedArray, ", ") + "]"
return result
}
func bulkWriteDenseVector() {
indexName := "vector_ivfpq_test"
var buf bytes.Buffer
// 添加多条数据,bulk写入也要控制每次写入的条数
for i := 1; i <= 20; i++ {
// 写20次,每次写入100条
for j := (i-1)*100 + 1; j <= i*100; j++ {
// id 可以根据业务自己指定,向量的random请替换为业务自己的。create可以替换为index
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, j))
data := map[string]interface{}{
"vector1": randomDenseVector(3),
"field1": i,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
continue
}
// 将操作写入缓冲区
buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
buf.Write(meta)
buf.WriteByte('\n') // 换行
buf.Write(dataJson)
buf.WriteByte('\n') // 换行
}
// 执行 Bulk 请求
bulkReq := opensearchapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
if err != nil {
fmt.Println("happend err", err)
}
defer res.Body.Close()
}
}
稀疏向量写入
以写入索引vector_sparse_test
为例:
type Data struct {
Indices []int `json:"indices"`
Values []float32 `json:"values"`
}
/*
* 格式参考curl使用文档,稀疏向量的格式说明
{"indices":[91,30,92],"values":[0.7862035,0.9371029,0.50112325]}
*/
func randomSparseVector() string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
// 随机生成 indices
sparsedim := random.Intn(16) + 2
indices := make([]int, sparsedim)
for i := range indices {
indices[i] = random.Intn(100) // 生成 0-99 之间的随机整数
}
// 随机生成 values
values := make([]float32, sparsedim)
for i := range values {
values[i] = random.Float32() // 生成 0.0-1.0 之间的随机浮点数
}
// 创建数据结构
data := Data{
Indices: indices,
Values: values,
}
// 将 data 编码为 JSON
jsonData, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling to JSON:", err)
return ""
}
// 打印生成的 JSON 字符串
return string(jsonData)
}
func bulkWriteSparseVector() {
indexName := "vector_sparse_test"
var buf bytes.Buffer
// 添加多条数据,bulk写入也要控制每次写入的条数,不能过大
for i := 5; i <= 50; i++ {
// id 可以根据业务自己指定,向量的random请替换为业务自己的。create可以替换为index
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i))
data := map[string]interface{}{
"vector1": randomSparseVector(),
"field1": i,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON: ", err)
continue
}
// 将操作写入缓冲区
buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
buf.Write(meta)
buf.WriteByte('\n') // 换行
buf.Write(dataJson)
buf.WriteByte('\n') // 换行
}
// 执行 Bulk 请求
bulkReq := opensearchapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
if err != nil {
fmt.Println("happend err", err)
}
defer res.Body.Close()
}
索引构建
除ivfpq索引,其他类型索引创建时index.knn.offline.construction默认为
false
,即在线索引,无需手动构建。在触发ivfpq索引构建前需注意:在创建ivfpq索引时,需将index.knn.offline.construction显式指定为
true
,且在发起构建时务必确保已写入足够的数据量,必须大于256条且超过nlist的30倍。手动触发索引构建完成后,后续可正常写入和查询,无需再次构建索引。
触发构建
构建索引,可以使用client.Transport.Perform
封装HTTP POST请求。以构建索引vector_ivfpq_test
为例:
func buildIndex() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"removeOldIndex": "true",
}
url := "/_plugins/_vector/index/build"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Encode error ", err)
return
}
buildIndexRequest, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Build err ", err)
return
}
buildIndexRequest.Header.Set("Accept", "application/json")
buildIndexRequest.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(buildIndexRequest)
if err != nil {
fmt.Println("Err ", err)
return
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
fmt.Println("Response:", response.String())
// 定义结构体解析结果
type BuildResponse struct {
Payload []string `json:"payload"`
}
var taskRes BuildResponse
if err := json.NewDecoder(response.Body).Decode(&taskRes); err != nil {
fmt.Println("Decode error:", err)
return
}
fmt.Println("TaskId: ", taskRes.Payload[0])
}
参数说明
参数 | 是否必填 | 说明 |
indexName | 是 | 表名称,例如 |
fieldName | 是 | 针对哪个字段构建索引,例如 |
removeOldIndex | 是 | 构建索引时,是否删除旧的索引。取值如下:
|
查看索引状态
可以使用client.Transport.Perform
封装HTTP GET请求。以查看索引vector_ivfpq_test
为例:
func getIndexStatus() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"taskIds": "[]",
}
url := "/_plugins/_vector/index/tasks"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Err ", err)
return
}
req, err := http.NewRequest("GET", url, buf)
if err != nil {
fmt.Println("Err ", err)
}
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(req)
if err != nil {
fmt.Println("Err ", err)
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
responseStr := response.String()
fmt.Println("Response:", responseStr)
// 定义结构体解析结果
type GetIndexResponse struct {
Payload []string `json:"payload"`
}
var indexRes GetIndexResponse
if err := json.NewDecoder(response.Body).Decode(&indexRes); err != nil {
fmt.Println("Decode error:", err)
return
}
indexResStr := indexRes.Payload[0]
fmt.Println("GetIndex Build Status response:", indexResStr)
// 如果不是这个状态,程序可以循环 sleep 等待
contains := strings.Contains(indexResStr, "stage: FINISH")
if contains {
fmt.Println("Index Build Success:")
}
}
终止构建
可以使用client.Transport.Perform
封装HTTP POST请求。以终止索引vector_ivfpq_test
的构建为例:
func abortIndex() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"taskIds": "[\"default_vector_ivfpq_test_vector1\"]",
}
url := "/_plugins/_vector/index/tasks/abort"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Encode error ", err)
return
}
abortReq, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Abort index err ", err)
return
}
abortReq.Header.Set("Accept", "application/json")
abortReq.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(abortReq)
if err != nil {
fmt.Println("")
return
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
fmt.Println("Abort index response:", response.String())
}
数据查询
纯向量数据查询
纯向量数据的查询可以通过knn
结构实现。以查询索引vector_test
为例:
func knnSearch() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err ", err)
return
}
// 打印每个文档的 ID 和得分
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
返回指定字段
如果需要在查询时返回指定字段,可以指定 "_source": ["field1", "field2"]
或使用"_source": true
返回非向量的全部字段。以查询索引vector_test
为例,使用方法如下:
func knnSearchHnswWithSource() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"_source": ["field1"],
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err ", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
}
}
返回结果如下:
hsnw算法查询
以查询hsnw索引vector_test
为例:
func knnSearchHnsw() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
},
"ext": {"lvector": {"ef_search": "100"}}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
ivfpq算法查询
以查询ivfpq索引vector_ivfpq_test
为例:
func knnSearchIvfpq() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
},
"ext": {"lvector": {"nprobe": "60", "reorder_factor": "5"}}
}`, vectorField, "[2.21, 3.31, 4.41]")
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
稀疏向量查询
以查询稀疏向量索引vector_sparse_test
为例:
func knnSearchSparseVector() {
indexName := "vector_sparse_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
}
}`, vectorField, randomSparseVector())
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err ", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
融合查询
向量列的查询可与普通列的查询条件结合,并返回综合的查询结果。在实际业务使用时, Post_Filter近似查询通常能获取更相似的检索结果。
Pre-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为pre_filter
,可实现先过滤结构化数据,再查询向量数据。
目前结构化过滤数据的上限为10,000条。
以查询索引vector_test
为例:
func preFilterKnnSearch() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10,
"filter": {
"range": {
"field1": {
"gte": 1
}
}
}
}
}
},
"ext": {"lvector": {"filter_type": "pre_filter"}}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
Post-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为post_filter
,可实现先查询向量数据,再过滤结构化数据。
在使用Post_Filter近似查询时,可以适当将k的值设置大一些,以便获取更多的向量数据再进行过滤。
以查询索引vector_ivfpq_test
为例:
func postFilterKnnSearch() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
knnTopk := 1000
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": %d,
"filter": {
"range": {
"field1": {
"gte": 0
}
}
}
}
}
},
"ext": {"lvector": {"filter_type": "post_filter", "nprobe": "100", "reorder_factor": "1"}}
}`, vectorField, "[2.21, 3.31, 4.41]", knnTopk)
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result %s", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
// 打印每个文档的 ID 和得分
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
常规用法
常规用法提供索引基础的查询、删除等使用方法。
查询所有索引及其数据量。
func catIndices() { req := opensearchapi.CatIndicesRequest{} ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() res, err := req.Do(ctx, client) if err != nil { fmt.Println("CatIndices err ", err) return } defer res.Body.Close() fmt.Println("CatIndices response:", res.String()) }
返回结果:
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open vector_test vector_test 2 0 2 0 6.8kb 6.8kb
查询指定索引的数据量。以查询索引
vector_ivfpq_test
为例:func getIndexCount() { indexName := "vector_ivfpq_test" ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() req := opensearchapi.CountRequest{ Index: []string{indexName}, } res, err := req.Do(ctx, client) if err != nil { fmt.Println("Count err %s", err) return } defer res.Body.Close() fmt.Println("Count result %s", res.String()) type CountResponse struct { Count int `json:"count"` } var countResponse CountResponse if err := json.NewDecoder(res.Body).Decode(&countResponse); err != nil { fmt.Printf("Error decoding response: %s\n", err) return } fmt.Printf("Count result: %d\n", countResponse.Count) }
返回结果:
{ "count" : 2, "_shards" : { "total" : 2, "successful" : 2, "skipped" : 0, "failed" : 0 } }
查看索引创建信息。以查询索引
vector_test
为例:func getIndex() { indexName := "vector_test" req := opensearchapi.IndicesGetRequest{ Index: []string{indexName}, } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() res, err := req.Do(ctx, client) if err != nil { fmt.Println("DeleteByQuery err ", err) return } defer res.Body.Close() if res.IsError() { fmt.Printf("Error: %s\n", res.String()) return } fmt.Println("GetIndex response:", res.String()) var indexData map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&indexData); err != nil { fmt.Println("Error parsing the response body:", err) return } // 如获取 space_type if properties, ok := indexData[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})["vector1"].(map[string]interface{})["method"].(map[string]interface{})["space_type"]; ok { if spaceType, ok := properties.(string); ok { fmt.Printf("space_type: %s\n", spaceType) } } }
返回结果:
删除整个索引。以删除索引
vector_ivfpq_test
为例:func deleteIndex() { indexName := "vector_ivfpq_test" req := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } // 删除索引需要设置较长的超时值 ctx, cancel := context.WithTimeout(context.Background(), 120*2*time.Second) defer cancel() rep, err := req.Do(ctx, client) if err != nil { fmt.Println("Delete err %s", err) } defer rep.Body.Close() fmt.Println("deleteIndex response:", rep.String()) }
通过查询删除符合查询条件的指定数据。
func deleteByQuery() { indexName := "vector_test" key := "field1" value := 1 queryStr := fmt.Sprintf(`{ "query": { "term": { "%s": %d } } }`, key, value) content := strings.NewReader(queryStr) req := opensearchapi.DeleteByQueryRequest{ Index: []string{indexName}, Body: content, } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() res, err := req.Do(ctx, client) if err != nil { fmt.Println("DeleteByQuery err %v", err) return } defer res.Body.Close() // fmt.Println("DeleteByQuery response:", res.String()) }