在多租户海量索引场景下,自定义路由键是实现用户级数据隔离与精准查询的核心技术。通过将用户标识(例如ID)指定为路由键,可保证每次查询仅针对目标用户数据,在保障了数据安全性的同时进一步提升查询性能。本文介绍如何使用自定义路由键功能。
前提条件
准备工作
在创建和使用向量索引前,您需要通过opensearch-go客户端连接搜索引擎,具体操作请参见安装OpenSearch Go客户端。
连接搜索引擎,连接方式请参见连接搜索引擎。
创建索引
仅支持纯向量数据查询
在向量检索场景中,索引类型的选择直接影响查询性能和检索效果。索引选择需综合考虑数据规模、查询延迟要求、召回精度需求及资源成本,以下为各索引类型的详细对比分析:
索引类型 | 数据量范围 | 性能特点 | 准确性 | 适用场景 |
Flat | 千级~万级 | 查询延迟高 | 精确匹配 | 数据量小且需精准检索 |
HNSW | 万级~百万级 | 查询延迟中等 | 近似匹配 | 中大规模数据实时检索 |
IVFPQ | 百万级~千万级 | 查询延迟低 | 近似匹配 | 海量数据高吞吐检索 |
稀疏向量索引 | 按需适配 | 依赖向量稀疏度 | 近似匹配 | 高维稀疏向量场景 |
在自定义路由键的场景下,主键_id是全局唯一,且必须是全局唯一。
创建索引时需指定
"knn_routing": true
,表示开启自定义路由键功能。对于ivfpq索引,还需设置"meta": {"offline.construction": "true"}
。
以下为创建4种索引的示例。
创建flat routing索引
func createUgcFlat() {
indexName := "vector_routing_flat_test"
indexBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"knn": true,
"knn_routing": true,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []string{"vector1"},
},
"properties": map[string]interface{}{
"vector1": map[string]interface{}{
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": map[string]interface{}{
"engine": "lvector",
"name": "flat",
"space_type": "l2",
"parameters": map[string]interface{}{},
},
},
"field1": map[string]interface{}{
"type": "long",
},
},
},
}
body, err := json.Marshal(indexBody)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("Error creating index: %s", res.String())
} else {
fmt.Println("Index created successfully")
}
}
创建hnsw routing索引
func createUgcHnsw() {
indexName := "vector_routing_hnsw_test"
indexBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"knn": true,
"knn_routing": true,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []string{"vector1"},
},
"properties": map[string]interface{}{
"vector1": map[string]interface{}{
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": map[string]interface{}{
"engine": "lvector",
"name": "hnsw",
"space_type": "l2",
"parameters": map[string]interface{}{
"m": 24,
"ef_construction": 500,
},
},
},
"field1": map[string]interface{}{
"type": "long",
},
},
},
}
body, err := json.Marshal(indexBody)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("Error creating index: %s", res.String())
} else {
fmt.Println("Index created successfully")
}
}
创建sparse_hnsw routing稀疏向量索引
func createUgcHnswSparse() {
indexName := "vector_routing_sparse_test"
indexBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"knn": true,
"knn_routing": true,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []string{"vector1"},
},
"properties": map[string]interface{}{
"vector1": map[string]interface{}{
"type": "knn_vector",
"data_type": "sparse_vector",
"method": map[string]interface{}{
"engine": "lvector",
"name": "sparse_hnsw",
"space_type": "innerproduct",
"parameters": map[string]interface{}{
"m": 24,
"ef_construction": 200,
},
},
},
"field1": map[string]interface{}{
"type": "long",
},
},
},
}
body, err := json.Marshal(indexBody)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("Error creating index: %s", res.String())
} else {
fmt.Println("Index created successfully")
}
}
创建ivfpq routing索引
func createUgcIvfpq() {
indexName := "vector_routing_ivfpq_test"
indexBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 4,
"knn": true,
"knn_routing": true,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []string{"vector1"},
},
"properties": map[string]interface{}{
"vector1": map[string]interface{}{
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"meta": map[string]interface{}{
"offline.construction": "true",
},
"method": map[string]interface{}{
"engine": "lvector",
"name": "ivfpq",
"space_type": "cosinesimil",
"parameters": map[string]interface{}{
"m": 3, // 设置为与维度dimension相同的值
"nlist": 2,
"centroids_use_hnsw": false,
"centroids_hnsw_m": 48,
"centroids_hnsw_ef_construct": 500,
"centroids_hnsw_ef_search": 200,
},
},
},
"field1": map[string]interface{}{
"type": "long",
},
},
},
}
body, err := json.Marshal(indexBody)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("Error creating index: %s", res.String())
} else {
fmt.Println("Index created successfully")
}
}
支持纯向量数据查询和融合查询
如果您需要执行融合查询,创建索引时需指定全文检索字段,即创建索引时添加以下参数:
func createUgcHybridSearch() {
indexName := "vector1_routing_hnsw_hybirdSearch"
indexBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"knn": true,
"knn_routing": true,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []string{"vector1"},
},
"properties": map[string]interface{}{
"vector1": map[string]interface{}{
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": map[string]interface{}{
"engine": "lvector",
"name": "hnsw",
"space_type": "l2",
"parameters": map[string]interface{}{
"m": 24,
"ef_construction": 500,
},
},
},
"text_field": map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
"field1": map[string]interface{}{
"type": "long",
},
},
},
}
body, err := json.Marshal(indexBody)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("Error creating index: %s", res.String())
} else {
fmt.Println("Index created successfully")
}
}
数据批量_bulk写入
通用写入
以写入索引vector_routing_ivfpq_test
为例:
// 代码样例是随机生成dim维度的向量,实际业务替换
// 生成格式 [0.7862035,0.9371029,0.50112325]
func randomDenseVector(dim int) string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
randomArray := make([]float32, dim)
for i := range randomArray {
randomArray[i] = random.Float32()
}
var formattedArray []string
for _, num := range randomArray {
formattedArray = append(formattedArray, fmt.Sprintf("%.8f", num))
}
result := "[" + strings.Join(formattedArray, ", ") + "]"
return result
}
func bulkWriteUgcDenseVector() {
indexName := "vector_routing_ivfpq_test"
for i := 0; i < 500; i++ {
var buf bytes.Buffer
// 每次循环写入500条数据
for j := 0; j < 500; j++ {
// 计算全局唯一的ID
id := i*500 + j + 1
// 使用ugc的索引,务必要指定 routing 字段
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d", "routing": "%d" } }`, indexName, id, id%5))
// 生成数据
data := map[string]interface{}{
"vector1": randomDenseVector(3),
"field1": i + 1,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
continue
}
// 将操作写入缓冲区
buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
buf.Write(meta)
buf.WriteByte('\n') // 换行
buf.Write(dataJson)
buf.WriteByte('\n') // 换行
}
// 执行 Bulk 请求
bulkReq := opensearchapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
if err != nil {
fmt.Println("Happened err", err)
} else {
fmt.Printf("Bulk insert completed for batch %d\n", i+1)
}
if res != nil && res.Body != nil {
res.Body.Close()
}
}
}
稀疏向量写入
以写入索引vector_routing_sparse_test
为例:
type Data struct {
Indices []int `json:"indices"`
Values []float32 `json:"values"`
}
/*
* 格式参考curl使用文档,稀疏向量的格式说明
{"indices":[91,30,92],"values":[0.7862035,0.9371029,0.50112325]}
*/
func randomSparseVector() string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
sparsedim := random.Intn(100) + 150
indices := []int{}
values := []float32{}
indicesSet := make(map[int]bool)
for len(indices) < sparsedim {
index := random.Intn(100000)
if !indicesSet[index] {
indices = append(indices, index)
indicesSet[index] = true
// 添加与索引对应的随机值
values = append(values, random.Float32())
}
}
// 创建数据结构
data := Data{
Indices: indices,
Values: values,
}
// 将 data 编码为 JSON
jsonData, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling to JSON:", err)
return ""
}
// 打印生成的 JSON 字符串
return string(jsonData)
}
func bulkWriteUgcSparseVector() {
indexName := "vector_routing_sparse_test"
for i := 0; i < 500; i++ {
var buf bytes.Buffer
// 每次循环写入500条数据
for j := 0; j < 500; j++ {
// 计算全局唯一的ID
id := i*500 + j + 1
// 使用ugc的索引,务必要指定 routing 字段
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d", "routing": "%d" } }`, indexName, id, id%5))
// 生成数据
data := map[string]interface{}{
"vector1": randomSparseVector(),
"field1": i + 1,
} dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
continue
}
// 将操作写入缓冲区
buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
buf.Write(meta)
buf.WriteByte('\n') // 换行
buf.Write(dataJson)
buf.WriteByte('\n') // 换行
}
// 执行 Bulk 请求
bulkReq := opensearchapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
if err != nil {
fmt.Println("Happened err", err)
} else {
fmt.Printf("Bulk insert completed for batch %d\n", i+1)
}
if res != nil && res.Body != nil {
res.Body.Close()
}
}
}
索引构建
构建ivfpq索引
仅ivfpq索引需手动构建,需要在构建语句中设置 "meta": {"offline.construction": "true"}
, 表示离线索引。
发起构建前务必确保索引已写入足够的数据量,必须大于256条且超过nlist的30倍。
func buildUgcIndex() {
indexName := "vector_routing_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"removeOldIndex": "true",
"ivf_train_only": "false",
}
url := "/_plugins/_vector/index/build"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Encode error ", err)
return
}
buildIndexRequest, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Build err ", err)
return
}
buildIndexRequest.Header.Set("Accept", "application/json")
buildIndexRequest.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(buildIndexRequest)
if err != nil {
fmt.Println("Err ", err)
return
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
fmt.Println("Response:", response.String())
// 定义结构体解析结果
type BuildResponse struct {
Payload []string `json:"payload"`
}
var taskRes BuildResponse
if err := json.NewDecoder(response.Body).Decode(&taskRes); err != nil {
fmt.Println("Decode error:", err)
return
}
fmt.Println("TaskId: ", taskRes.Payload[0])
}
参数说明:
参数 | 是否必填 | 说明 |
ivf_train_only | 是 |
无论设置为 |
清理训练数据,保留索引码本
如果将ivf_train_only设置为true
,则必须执行该步骤。该操作利用现有数据训练码本,不对现有数据生成索引。
其中,reserve_codebook=true
为必填项,表示保存索引码本。清理训练数据后需重新写入数据才可以执行纯向量数据查询(knn检索)。
如果ivf_train_only设置为false
,现存数据会根据训练的码本生成索引数据,且会保留现有的数据,您可跳过该步骤。
数据查询
纯向量数据查询
纯向量数据的查询可以通过knn
结构实现。
查询时,需要在url上指定routing参数。
在自定义路由键场景中,可以将nprobe的值设置为创建索引时设置的nlist参数的值。
flat routing索引
func knnSearchUgcFlat() {
indexName := "vector_routing_flat_test"
vectorField := "vector1"
knnQuery := map[string]interface{}{
"size": 20,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": []float64{2.3, 3.3, 4.4},
"k": 20,
},
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用ugc时,检索必须要指定Routing字段
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
Routing: []string{"1"},
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
}
}
hnsw routing索引
func knnSearchUgcHnsw() {
indexName := "vector_routing_hnsw_test"
vectorField := "vector1"
knnQuery := map[string]interface{}{
"size": 20,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": []float64{2.3, 3.3, 4.4},
"k": 20,
},
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用ugc时,检索必须要指定Routing字段
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
Routing: []string{"1"},
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
}
}
sparse_hsnw routing稀疏向量索引
func knnSearchUgcHnswSparse() {
indexName := "vector_routing_sparse_test"
vectorField := "vector1"
knnQuery := map[string]interface{}{
"size": 10,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": map[string]interface{}{
"indices": []int{10, 45, 16},
"values": []float64{0.5, 0.5, 0.2},
},
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"ef_search": "100",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用ugc时,检索必须要指定Routing字段
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
Routing: []string{"1"},
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
}
}
ivfpq routing索引
func knnSearchUgcIvfpq() {
indexName := "vector_routing_ivfpq_test"
vectorField := "vector1"
knnQuery := map[string]interface{}{
"size": 10,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": []float32{2.2, 2.3, 2.4},
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"nprobe": "2",
"reorder_factor": "2",
"client_refactor": "true",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用ugc时,检索必须要指定Routing字段
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
Routing: []string{"1"},
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
}
}
融合查询
使用融合查询前请确保您的索引已指定全文检索字段。
全文向量混合检索
func ugcTextVectorHybridSearch() {
indexName := "vector1_routing_hnsw_hybirdSearch"
vectorField := "vector1"
knnQuery := map[string]interface{}{
"size": 10,
"_source": false,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": []float64{2.8, 2.3, 2.4},
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"text_field": map[string]interface{}{
"query": "test1 test2",
},
},
},
{
"term": map[string]interface{}{
"_routing": "替换为链接地址中指定的routing值,如1、user123",
},
},
},
},
},
},
},
},
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "60",
"rrf_knn_weight_factor": "0.5",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用ugc时,检索必须要指定Routing字段
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
Routing: []string{"1"},
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
}
}
向量+全文融合+属性过滤
func ugcTextVectorFilterHybridSearch() {
indexName := "vector1_routing_hnsw_hybirdSearch"
vectorField := "vector1"
knnQuery := map[string]interface{}{
"size": 10,
"_source": false,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": []float64{2.8, 2.3, 2.4},
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"text_field": map[string]interface{}{
"query": "test1 test2",
},
},
},
{
"term": map[string]interface{}{
"_routing": "替换为连接语句中指定的routing值,如1、user123",
},
},
},
},
},
{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
{
"range": map[string]interface{}{
"field1": map[string]interface{}{
"gt": 2,
},
},
},
},
},
},
},
},
},
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "60",
"rrf_knn_weight_factor": "0.5",
"filter_type": "efficient_filter",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用ugc时,检索必须要指定Routing字段
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
Routing: []string{"1"},
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
}
}