通过Go(OpenSearch Go)访问向量引擎

Lindorm向量引擎支持向量数据检索功能,兼容Elasticsearch协议,同时支持标量、向量、全文混合检索功能。本文介绍基于Go语言,通过OpenSearch Go客户端连接和使用向量引擎的方法。

前提条件

  • 已安装Go环境,建议安装Go 1.17及以上版本。

  • 已开通Lindorm向量引擎。如何开通,请参见开通向量引擎

  • 已开通Lindorm搜索引擎。具体操作,请参见开通指南

  • 已将客户端的IP地址加入到Lindorm实例的白名单中。具体操作,请参见设置白名单

准备工作

安装OpenSearch Go客户端

您需要安装OpenSearch Go客户端,支持以下两种方式:

  • 方式一:

    1. 修改go.mod文件配置,添加相关依赖,具体如下:

      module SearchDemo 
      
      go 1.23.0 //请替换为您的Go版本
      
      require (
      	github.com/opensearch-project/opensearch-go/v2 v2.3.0
      	github.com/google/uuid v1.6.0 // 非必须,示例代码中生成uuid需要该项,请根据实际情况配置
      )
    2. 执行以下命令,更新go.mod文件。

      go mod tidy
  • 方式二:执行以下命令直接下载。

    go get github.com/opensearch-project/opensearch-go/v2@v2.3.0

连接搜索引擎

package main

import (
	"bufio"
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/opensearch-project/opensearch-go/v2"
	"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
	"math/rand"
	"net/http"
	"strings"
	"time"
)

var client *opensearch.Client

func init() {
    // 替换为搜索引擎域名
	cfg := opensearch.Config{
		Addresses: []string{
			"http://ld-t4n5668xk31ui****-proxy-search-vpc.lindorm.aliyuncs.com:30070",
		},
		Username: "<Username>",
		Password: "<Password>",
	}
	var err error
	client, err = opensearch.NewClient(cfg)
	if err != nil {
		fmt.Println("Init client error %s", err)
	}
}

// 定义错误处理函数
func handlerCommonError(res *opensearchapi.Response) {
	var errorResponse map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
		fmt.Printf("Error parsing the error response: %s\n", err)
		return
	}
	fmt.Printf("ERROR %v\n", errorResponse)
}

// 定义knnSearch返回的结构体
type Source struct {
	Field1 int `json:"field1"`
}
type KnnResponse struct {
	Hits struct {
		Hits []struct {
			ID     string  `json:"_id"`
			Score  float64 `json:"_score"`
			Source Source  `json:"_source"`
		} `json:"hits"`
	} `json:"hits"`
}

其中AddressesUsernamePassword分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看搜索引擎连接地址

创建向量索引

创建向量索引,其中vector1为向量列、field1为普通列。向量列及其相关参数必须在创建索引时通过mappings结构显式指定。

hnsw类型索引

以创建索引vector_test为例:

func createHnswIndex() {
	indexName := "vector_test"
	vectorColumn := "vector1"
	indexBody := fmt.Sprintf(`
	{
	  "settings": {
		"index": {
		  "number_of_shards": 2,
		  "knn": true
		}
	  },
	  "mappings": {
		"_source": {
		  "excludes": ["%s"]
		},
		"properties": {
		  "%s": {
			"type": "knn_vector",
			"dimension": 3,
			"data_type": "float",
			"method": {
			  "engine": "lvector",
			  "name": "hnsw",
			  "space_type": "l2",
			  "parameters": {
				"m": 24,
				"ef_construction": 500
			  }
			}
		  },
		  "field1": {
			"type": "long"
		  }
		}
	  }
    }`, vectorColumn, vectorColumn)
	content := strings.NewReader(indexBody)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	
	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  content,
	}
	
	res, err := req.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("Create index error: ", err)
		return
	}
	if res.IsError() {
		handlerCommonError(res)
		return
	}
	fmt.Println("Create response: ", res.String())
}

ivfpq类型索引

以创建索引vector_ivfpq_test为例:

func createIvfPQIndex() {
	indexName := "vector_ivfpq_test"
	vectorColumn := "vector1"
	// 下方的nlist参数,在实际业务中请务必设置为10000
	indexBody := fmt.Sprintf(`
	{
	  "settings": {
		"index": {
		  "number_of_shards": 4,
		  "knn": true,
		  "knn.offline.construction": true
		}
	  },
	  "mappings": {
		"_source": {
		  "excludes": ["%s"]
		},
		"properties": {
		  "%s": {
			"type": "knn_vector",
			"dimension": 3,
			"data_type": "float",
			"method": {
			  "engine": "lvector",
			  "name": "ivfpq",
			  "space_type": "cosinesimil",
			  "parameters": {
				"m": 3,
				"nlist": 100,
				"centroids_use_hnsw": true,
				"centroids_hnsw_m": 48,
				"centroids_hnsw_ef_construct": 500,
				"centroids_hnsw_ef_search": 200
			  }
			}
		  },
		  "field1": {
			"type": "long"
		  }
		}
	  }
	}`, vectorColumn, vectorColumn)
	content := strings.NewReader(indexBody)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	// 创建索引请求
	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  content,
	}
	res, err := req.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("Create index error ", err)
		return
	}
	fmt.Println("Create response %s", res.String())
}
重要

创建ivfpq类型索引,您必须注意以下事项:

  • ivfpq中knn.offline.construction务必设置为true,后续需要写入一定量的数据才能发起构建索引。

  • 使用ivfpq算法请将dimension替换业务真实向量维度,并将m值设置为与dimension相同的值。

稀疏向量索引

以创建索引vector_sparse_test为例:

func createSparseVectorIndex() {
	indexName := "vector_sparse_test"
	vectorColumn := "vector1"
	indexBody := fmt.Sprintf(`
	{
	  "settings": {
		"index": {
		  "number_of_shards": 2,
		  "knn": true
		}
	  },
	  "mappings": {
		"_source": {
		  "excludes": ["%s"]
		},
		"properties": {
		  "%s": {
			"type": "knn_vector",
			"data_type": "sparse_vector",
			"method": {
			  "engine": "lvector",
			  "name": "sparse_hnsw",
			  "space_type": "innerproduct",
			  "parameters": {
				"m": 24,
				"ef_construction": 200
			  }
			}
		  },
		  "field1": {
			"type": "long"
		  }
		}
	  }
	}`, vectorColumn, vectorColumn)
	content := strings.NewReader(indexBody)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  content,
	}
	res, err := req.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("Create index error: ", err)
		return
	}
	if res.IsError() {
		handlerCommonError(res)
		return
	}
	fmt.Println("Create response: ", res.String())
}
重要

创建稀疏向量时,data_type参数必须填写sparse_vector,name参数必须填写sparse_hnsw,space_type参数必须填写innerproduct

参数说明

通用参数

参数

是否必填

说明

type

索引列的类型。对于向量列,固定为knn_vector

dimension

向量数据的维度。取值范围:[1,32768]。

data_type

向量数据的类型。目前支持以下类型:

  • float

  • float16

  • sparse_vector(仅稀疏向量索引支持该类型)

method.name

向量数据的索引算法。取值如下:

  • flat

  • hnsw

  • ivfpq

  • sparse_hnsw(仅稀疏向量索引支持该算法)

method.space_type

向量数据的距离算法。取值如下:

  • l2(默认值):欧式距离。

  • cosinesimil:余弦距离。

  • innerproduct:内积距离。

HNSW算法参数

参数

是否必填

说明

method.parameters.m

每一层图的最大出边数量。

取值范围:[1,100]。默认值为16

method.parameters.ef_construction

索引构建时动态列表的长度。

取值范围:[1,1000]。默认值为100

IVFPQ算法参数

参数

是否必填

说明

method.parameters.m

量化中子空间的数量。取值范围:[2,32768]。默认值为16

重要

创建ivfpq类型索引时,该参数值必须与通用参数dimension的值相同。

method.parameters.nlist

聚类中心的数量。

取值范围:[2, 1000000]。默认值为10000

method.parameters.centroids_use_hnsw

是否在聚类中心搜索时使用HNSW算法。

取值如下:

  • true(默认值

  • false

method.parameters.centroids_hnsw_m

若在聚类中心搜索时使用HNSW算法,设定HNSW算法的每一层图的最大出边数量。

取值范围:[1,100]。默认值为16

method.parameters.centroids_hnsw_ef_construct

若在聚类中心搜索时使用HNSW算法,设定HNSW算法在索引构建时动态列表的长度。

取值范围:[1,1000]。默认值为100

method.parameters.centroids_hnsw_ef_search

若在聚类中心搜索时使用HNSW算法,设定HNSW算法在查询时动态列表的长度。

取值范围:[1,1000]。默认值为100

数据写入

包含向量列的索引的数据写入方式与普通索引的数据写入方式一致。

单条写入

您可以根据业务需要选择插入写或更新写。

插入写

若目标数据已存在,则不允许写入。以写入索引vector_test为例:

func writeDataCreate() {
	// 插入写
	doc := strings.NewReader(`{
		"field1": 2,
		"vector1": [2.2, 2.3, 2.4]
	}`)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	req := opensearchapi.CreateRequest{
		Index:      "vector_test",
		DocumentID: "2",
		Body:       doc,
	}

	res, err := req.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("Write index error ", err)
	}
}

更新写

以写入索引vector_test为例:

func writeDataIndex() {
	// 更新写
	doc := strings.NewReader(`{
		"field1": 1,
		"vector1": [1.2, 1.3, 1.4]
	}`)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	req := opensearchapi.IndexRequest{
		Index:      "vector_test",
		DocumentID: "1",
		Body:       doc,
	}
	res, err := req.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("Write index error ", err)
	}
}

批量写入

通用写入

_bulk写入方式为指定JSON写入。以写入索引vector_test为例,具体如下:

// 人为指定json串
func writeDataBulk() {
	// bulk 写入
	doc := strings.NewReader(`
		{ "index" : { "_index" : "vector_test", "_id" : "3" } }
		{ "field1" : 3, "vector1": [3.2, 3.3, 3.4] }
		{ "create" : { "_index" : "vector_test", "_id" : "4" } }
		{ "field1" : 4, "vector1": [4.2, 4.3, 4.4] }
		{ "delete" : { "_index" : "vector_test", "_id" : "2" } }
		{ "update" : {"_id" : "1", "_index" : "vector_test"} }
		{ "doc" : {"field1" : 3, "vector1": [2.21, 3.31, 4.41]} }
	`)

	// 确保每行后都有换行符
	var buffer bytes.Buffer
	scanner := bufio.NewScanner(doc)
	for scanner.Scan() {
		buffer.WriteString(scanner.Text())
		buffer.WriteString("\n")
	}

	res, err := client.Bulk(&buffer)
	defer res.Body.Close()
	fmt.Println(res.String())
	if err != nil {
		fmt.Println("Bulk write error ", err)
	}
}

您也可以定义数据写入逻辑,通过_bulk一次性写入大量数据。以写入索引vector_ivfpq_test为例:

// 代码样例是随机生成dim维度的向量,实际业务替换
// 生成格式 [0.7862035,0.9371029,0.50112325]
func randomDenseVector(dim int) string {
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	randomArray := make([]float32, dim)
	for i := range randomArray {
		randomArray[i] = random.Float32()
	}
	var formattedArray []string
	for _, num := range randomArray {
		formattedArray = append(formattedArray, fmt.Sprintf("%.8f", num))
	}
	result := "[" + strings.Join(formattedArray, ", ") + "]"
	return result
}

func bulkWriteDenseVector() {
	indexName := "vector_ivfpq_test"
	var buf bytes.Buffer
	// 添加多条数据,bulk写入也要控制每次写入的条数
	for i := 1; i <= 20; i++ {
		// 写20次,每次写入100条
		for j := (i-1)*100 + 1; j <= i*100; j++ {
			// id 可以根据业务自己指定,向量的random请替换为业务自己的。create可以替换为index
			meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, j))
			data := map[string]interface{}{
				"vector1": randomDenseVector(3),
				"field1":  i,
			}
			dataJson, err := json.Marshal(data)
			if err != nil {
				fmt.Println("Error marshaling JSON:", err)
				continue
			}
			// 将操作写入缓冲区
			buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
			buf.Write(meta)
			buf.WriteByte('\n') // 换行
			buf.Write(dataJson)
			buf.WriteByte('\n') // 换行
		}

		// 执行 Bulk 请求
		bulkReq := opensearchapi.BulkRequest{
			Body:  &buf,
			Index: indexName,
		}

		// 设置超时
		ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
		defer cancel()

		res, err := bulkReq.Do(ctx, client)
		if err != nil {
			fmt.Println("happend err", err)
		}

		defer res.Body.Close()
	}
}

稀疏向量写入

以写入索引vector_sparse_test为例:

type Data struct {
	Indices []int     `json:"indices"`
	Values  []float32 `json:"values"`
}

/*
* 格式参考curl使用文档,稀疏向量的格式说明
{"indices":[91,30,92],"values":[0.7862035,0.9371029,0.50112325]}
*/
func randomSparseVector() string {
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	// 随机生成 indices
	sparsedim := random.Intn(16) + 2
	indices := make([]int, sparsedim)
	for i := range indices {
		indices[i] = random.Intn(100) // 生成 0-99 之间的随机整数
	}
	// 随机生成 values
	values := make([]float32, sparsedim)
	for i := range values {
		values[i] = random.Float32() // 生成 0.0-1.0 之间的随机浮点数
	}
	// 创建数据结构
	data := Data{
		Indices: indices,
		Values:  values,
	}
	// 将 data 编码为 JSON
	jsonData, err := json.Marshal(data)
	if err != nil {
		fmt.Println("Error marshaling to JSON:", err)
		return ""
	}
	// 打印生成的 JSON 字符串
	return string(jsonData)
}

func bulkWriteSparseVector() {
	indexName := "vector_sparse_test"
	var buf bytes.Buffer
	// 添加多条数据,bulk写入也要控制每次写入的条数,不能过大
	for i := 5; i <= 50; i++ {
		// id 可以根据业务自己指定,向量的random请替换为业务自己的。create可以替换为index
		meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i))
		data := map[string]interface{}{
			"vector1": randomSparseVector(),
			"field1":  i,
		}
		dataJson, err := json.Marshal(data)
		if err != nil {
			fmt.Println("Error marshaling JSON: ", err)
			continue
		}
		// 将操作写入缓冲区
		buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
		buf.Write(meta)
		buf.WriteByte('\n') // 换行
		buf.Write(dataJson)
		buf.WriteByte('\n') // 换行
	}

	// 执行 Bulk 请求
	bulkReq := opensearchapi.BulkRequest{
		Body:  &buf,
		Index: indexName,
	}

	// 设置超时
	ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
	defer cancel()

	res, err := bulkReq.Do(ctx, client)
	if err != nil {
		fmt.Println("happend err", err)
	}
	defer res.Body.Close()
}

索引构建

重要
  • 除ivfpq索引,其他类型索引创建时index.knn.offline.construction默认为false,即在线索引,无需手动构建。

  • 在触发ivfpq索引构建前需注意:在创建ivfpq索引时,需将index.knn.offline.construction显式指定为true,且在发起构建时务必确保已写入足够的数据量,必须大于256条且超过nlist的30倍。

  • 手动触发索引构建完成后,后续可正常写入和查询,无需再次构建索引

触发构建

构建索引,可以使用client.Transport.Perform封装HTTP POST请求。以构建索引vector_ivfpq_test为例:

func buildIndex() {
	indexName := "vector_ivfpq_test"
	vectorField := "vector1"
	bodyBuild := map[string]interface{}{
		"indexName":      indexName,
		"fieldName":      vectorField,
		"removeOldIndex": "true",
	}
	url := "/_plugins/_vector/index/build"
	buf := new(bytes.Buffer)
	if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
		fmt.Println("Encode error ", err)
		return
	}
	buildIndexRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	buildIndexRequest.Header.Set("Accept", "application/json")
	buildIndexRequest.Header.Set("Content-Type", "application/json")
	res, err := client.Transport.Perform(buildIndexRequest)
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()
	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}
  
	fmt.Println("Response:", response.String())
    // 定义结构体解析结果
	type BuildResponse struct {
		Payload []string `json:"payload"`
	}

	var taskRes BuildResponse
	if err := json.NewDecoder(response.Body).Decode(&taskRes); err != nil {
		fmt.Println("Decode error:", err)
		return
	}

	fmt.Println("TaskId: ", taskRes.Payload[0])
}

参数说明

参数

是否必填

说明

indexName

表名称,例如vector_ivfpq_test

fieldName

针对哪个字段构建索引,例如vector1

removeOldIndex

构建索引时,是否删除旧的索引。取值如下:

  • true:在触发构建时,会删除旧的索引数据,在构建完成后才能进行knn查询。

    重要

    实际业务使用,建议设置为true

  • false(默认值):会保留旧的索引,但会影响检索性能。

查看索引状态

可以使用client.Transport.Perform封装HTTP GET请求。以查看索引vector_ivfpq_test为例:

func getIndexStatus() {
	indexName := "vector_ivfpq_test"
	vectorField := "vector1"
	bodyBuild := map[string]interface{}{
		"indexName": indexName,
		"fieldName": vectorField,
		"taskIds":   "[]",
	}
	url := "/_plugins/_vector/index/tasks"
	buf := new(bytes.Buffer)
	if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
		fmt.Println("Err ", err)
		return
	}
	req, err := http.NewRequest("GET", url, buf)
	if err != nil {
		fmt.Println("Err ", err)
	}
	req.Header.Set("Accept", "application/json")
	req.Header.Set("Content-Type", "application/json")
	res, err := client.Transport.Perform(req)
	if err != nil {
		fmt.Println("Err ", err)
	}
	defer res.Body.Close()
	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}
	responseStr := response.String()
	fmt.Println("Response:", responseStr)
	// 定义结构体解析结果
	type GetIndexResponse struct {
		Payload []string `json:"payload"`
	}
	var indexRes GetIndexResponse
	if err := json.NewDecoder(response.Body).Decode(&indexRes); err != nil {
		fmt.Println("Decode error:", err)
		return
	}
	indexResStr := indexRes.Payload[0]
	fmt.Println("GetIndex Build Status response:", indexResStr)
    // 如果不是这个状态,程序可以循环 sleep 等待
	contains := strings.Contains(indexResStr, "stage: FINISH")
	if contains {
		fmt.Println("Index Build Success:")
	}
}

终止构建

可以使用client.Transport.Perform封装HTTP POST请求。以终止索引vector_ivfpq_test的构建为例:

func abortIndex() {
   indexName := "vector_ivfpq_test"
   vectorField := "vector1"
   bodyBuild := map[string]interface{}{
      "indexName": indexName,
      "fieldName": vectorField,
      "taskIds":   "[\"default_vector_ivfpq_test_vector1\"]",
   }
   url := "/_plugins/_vector/index/tasks/abort"
   buf := new(bytes.Buffer)
   if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
      fmt.Println("Encode error ", err)
      return
   }
   abortReq, err := http.NewRequest("POST", url, buf)
   if err != nil {
      fmt.Println("Abort index err ", err)
      return
   }
   abortReq.Header.Set("Accept", "application/json")
   abortReq.Header.Set("Content-Type", "application/json")
   res, err := client.Transport.Perform(abortReq)
   if err != nil {
      fmt.Println("")
      return
   }
   defer res.Body.Close()
   response := opensearchapi.Response{
      StatusCode: res.StatusCode,
      Body:       res.Body,
      Header:     res.Header,
   }
   fmt.Println("Abort index response:", response.String())
}

数据查询

纯向量数据查询

纯向量数据的查询可以通过knn结构实现。以查询索引vector_test为例:

func knnSearch() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  }
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
 	// fmt.Println("knn result ", searchResponse.String())
    if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err ", err)
		return
	}
	// 打印每个文档的 ID 和得分
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

返回指定字段

如果需要在查询时返回指定字段,可以指定 "_source": ["field1", "field2"] 或使用"_source": true 返回非向量的全部字段。以查询索引vector_test为例,使用方法如下:

func knnSearchHnswWithSource() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
      "_source": ["field1"],
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  }
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	// fmt.Println("knn result ", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err ", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
	}
}

返回结果如下:

单击展开返回结果

{
  "took" : 35,
  "timed_out" : false,
  "terminated_early" : false,
  "num_reduce_phases" : 0,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "vector_test",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "field1" : 1
        }
      },
      {
        "_index" : "vector_test",
        "_id" : "1",
        "_score" : 0.25,
        "_source" : {
          "field1" : 1
        }
      },
      {
        "_index" : "vector_test",
        "_id" : "3",
        "_score" : 0.14285715,
        "_source" : {
          "field1" : 2
        }
      }
    ]
  }
}

hsnw算法查询

以查询hsnw索引vector_test为例:

func knnSearchHnsw() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  },
     "ext": {"lvector": {"ef_search": "100"}}
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	// fmt.Println("Knn result ", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

ivfpq算法查询

以查询ivfpq索引vector_ivfpq_test为例:

func knnSearchIvfpq() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  },
      "ext": {"lvector": {"nprobe": "60", "reorder_factor": "5"}}
	}`, vectorField, "[2.21, 3.31, 4.41]")

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err", err)
		return
	}
	// fmt.Println("Knn result", searchResponse.String())
    if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

稀疏向量查询

以查询稀疏向量索引vector_sparse_test为例:

func knnSearchSparseVector() {
	indexName := "vector_sparse_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  }
	}`, vectorField, randomSparseVector())

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	// fmt.Println("Knn result ", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err ", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

融合查询

向量列的查询可与普通列的查询条件结合,并返回综合的查询结果。在实际业务使用时, Post_Filter近似查询通常能获取更相似的检索结果。

Pre-Filter近似查询

通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为pre_filter,可实现先过滤结构化数据,再查询向量数据。

说明

目前结构化过滤数据的上限为10,000条。

以查询索引vector_test为例:

func preFilterKnnSearch() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10,
			"filter": {
			  "range": {
				"field1": {
				  "gte": 1
				}
			  }
			}
		  }
		}
	  },
     "ext": {"lvector": {"filter_type": "pre_filter"}}
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err", err)
		return
	}
	// fmt.Println("Knn result", searchResponse.String())
    if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

Post-Filter近似查询

通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为post_filter,可实现先查询向量数据,再过滤结构化数据。

说明

在使用Post_Filter近似查询时,可以适当将k的值设置大一些,以便获取更多的向量数据再进行过滤。

以查询索引vector_ivfpq_test为例:

func postFilterKnnSearch() {
	indexName := "vector_ivfpq_test"
	vectorField := "vector1"
	knnTopk := 1000
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": %d,
	        "filter": {
			  "range": {
				"field1": {
				  "gte": 0
				}
			  }
			}
		  }
		}
	  },
      "ext": {"lvector": {"filter_type": "post_filter", "nprobe": "100", "reorder_factor": "1"}}
	}`, vectorField, "[2.21, 3.31, 4.41]", knnTopk)

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	defer searchResponse.Body.Close()
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	// fmt.Println("Knn result %s", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}
	// 打印每个文档的 ID 和得分
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

常规用法

常规用法提供索引基础的查询、删除等使用方法。

  • 查询所有索引及其数据量。

    func catIndices() {
    	req := opensearchapi.CatIndicesRequest{}
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	res, err := req.Do(ctx, client)
    	defer res.Body.Close()
    	if err != nil {
    		fmt.Println("CatIndices err ", err)
    		return
    	}
    	fmt.Println("CatIndices  response:", res.String())
    }

    返回结果:

    health status index        uuid        pri rep docs.count docs.deleted store.size pri.store.size
    green  open   vector_test  vector_test 2   0          2            0      6.8kb          6.8kb
  • 查询指定索引的数据量。以查询索引vector_ivfpq_test为例:

    func getIndexCount() {
    	indexName := "vector_ivfpq_test"
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	req := opensearchapi.CountRequest{
    		Index: []string{indexName},
    	}
    	res, err := req.Do(ctx, client)
    	defer res.Body.Close()
    	if err != nil {
    		fmt.Println("Count err %s", err)
    		return
    	}
    	fmt.Println("Count result %s", res.String())
    	
    	type CountResponse struct {
    		Count int `json:"count"`
    	}
    	var countResponse CountResponse
    	if err := json.NewDecoder(res.Body).Decode(&countResponse); err != nil {
    		fmt.Printf("Error decoding response: %s\n", err)
    		return
    	}
    	fmt.Printf("Count result: %d\n", countResponse.Count)
    }

    返回结果:

    {
      "count" : 2,
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "skipped" : 0,
        "failed" : 0
      }
    }
  • 查看索引创建信息。以查询索引vector_test为例:

    func getIndex() {
    	indexName := "vector_test"
    	req := opensearchapi.IndicesGetRequest{
    		Index: []string{indexName},
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	res, err := req.Do(ctx, client)
    	defer res.Body.Close()
    	if err != nil {
    		fmt.Println("DeleteByQuery err ", err)
    		return
    	}
    	if res.IsError() {
    		fmt.Printf("Error: %s\n", res.String())
    		return
    	}
    	fmt.Println("GetIndex  response:", res.String())
    	var indexData map[string]interface{}
    	if err := json.NewDecoder(res.Body).Decode(&indexData); err != nil {
    		fmt.Println("Error parsing the response body:", err)
    		return
    	}
    
    	// 如获取 space_type
    	if properties, ok := indexData[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})["vector1"].(map[string]interface{})["method"].(map[string]interface{})["space_type"]; ok {
    		if spaceType, ok := properties.(string); ok {
    			fmt.Printf("space_type: %s\n", spaceType)
    		}
    	}
    }

    返回结果:

    单击展开返回结果

    {
      "vector_test" : {
        "aliases" : { },
        "mappings" : {
          "_source" : {
            "excludes" : [
              "vector1"
            ]
          },
          "properties" : {
            "field1" : {
              "type" : "long"
            },
            "vector1" : {
              "type" : "knn_vector",
              "dimension" : 3,
              "data_type" : "float",
              "method" : {
                "engine" : "lvector",
                "space_type" : "l2",
                "name" : "hnsw",
                "parameters" : {
                  "ef_construction" : 200,
                  "m" : 24
                }
              }
            }
          }
        },
        "settings" : {
          "index" : {
            "search" : {
              "slowlog" : {
                "level" : "DEBUG",
                "threshold" : {
                  "fetch" : {
                    "warn" : "1s",
                    "trace" : "200ms",
                    "debug" : "500ms",
                    "info" : "800ms"
                  },
                  "query" : {
                    "warn" : "10s",
                    "trace" : "500ms",
                    "debug" : "1s",
                    "info" : "5s"
                  }
                }
              }
            },
            "indexing" : {
              "slowlog" : {
                "level" : "DEBUG",
                "threshold" : {
                  "index" : {
                    "warn" : "10s",
                    "trace" : "500ms",
                    "debug" : "2s",
                    "info" : "5s"
                  }
                }
              }
            },
            "number_of_shards" : "2",
            "provided_name" : "vector_test",
            "knn" : "true",
            "creation_date" : "1727169417350",
            "number_of_replicas" : "0",
            "uuid" : "vector_test",
            "version" : {
              "created" : "136287927"
            }
          }
        }
      }
    }
  • 删除整个索引。以删除索引vector_ivfpq_test为例:

    func deleteIndex() {
    	indexName := "vector_ivfpq_test"
    	req := opensearchapi.IndicesDeleteRequest{
    		Index: []string{indexName},
    	}
    	// 删除索引需要设置较长的超时值
    	ctx, cancel := context.WithTimeout(context.Background(), 120*2*time.Second)
    	defer cancel()
    	rep, err := req.Do(ctx, client)
    	defer rep.Body.Close()
    	if err != nil {
    		fmt.Println("Delete err %s", err)
    	}
    	fmt.Println("deleteIndex  response:", rep.String())
    }
  • 通过查询删除符合查询条件的指定数据。

    func deleteByQuery() {
    	indexName := "vector_test"
    	key := "field1"
    	value := 1
    	queryStr := fmt.Sprintf(`{
    	  "query": {
    		"term": {
    		  "%s": %d
    		}
    	  }
    	}`, key, value)
    
    	content := strings.NewReader(queryStr)
    	req := opensearchapi.DeleteByQueryRequest{
    		Index: []string{indexName},
    		Body:  content,
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	res, err := req.Do(ctx, client)
    	defer res.Body.Close()
    	if err != nil {
    		fmt.Println("DeleteByQuery err %v", err)
    		return
    	}
    	// fmt.Println("DeleteByQuery  response:", res.String())
    }