自定义路由键

在多租户海量索引场景下,自定义路由键是实现用户级数据隔离与精准查询的核心技术。通过将用户标识(例如ID)指定为路由键,可保证每次查询仅针对目标用户数据,在保障了数据安全性的同时进一步提升查询性能。本文介绍如何使用自定义路由键功能。

前提条件

  • 已安装Go环境,建议安装Go 1.17及以上版本。

  • 已开通Lindorm向量引擎。如何开通,请参见开通向量引擎

  • 已开通Lindorm搜索引擎。具体操作,请参见开通指南

  • 已将客户端的IP地址加入到Lindorm实例的白名单中。具体操作,请参见设置白名单

准备工作

创建索引

仅支持纯向量数据查询

在向量检索场景中,索引类型的选择直接影响查询性能和检索效果。索引选择需综合考虑数据规模、查询延迟要求、召回精度需求及资源成本,以下为各索引类型的详细对比分析:

索引类型

数据量范围

性能特点

准确性

适用场景

Flat

千级~万级

查询延迟高

精确匹配

数据量小且需精准检索

HNSW

万级~百万级

查询延迟中等

近似匹配

中大规模数据实时检索

IVFPQ

百万级~千万级

查询延迟低

近似匹配

海量数据高吞吐检索

稀疏向量索引

按需适配

依赖向量稀疏度

近似匹配

高维稀疏向量场景

重要
  • 在自定义路由键的场景下,主键_id全局唯一,且必须是全局唯一。

  • 创建索引时需指定 "knn_routing": true,表示开启自定义路由键功能。对于ivfpq索引,还需设置"meta": {"offline.construction": "true"}

以下为创建4种索引的示例。

创建flat routing索引

func createUgcFlat() {
	indexName := "vector_routing_flat_test"
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 2,
				"knn":              true,
				"knn_routing":      true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{"vector1"},
			},
			"properties": map[string]interface{}{
				"vector1": map[string]interface{}{
					"type":      "knn_vector",
					"dimension": 3,
					"data_type": "float",
					"method": map[string]interface{}{
						"engine":     "lvector",
						"name":       "flat",
						"space_type": "l2",
						"parameters": map[string]interface{}{},
					},
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
			},
		},
	}

	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

创建hnsw routing索引

func createUgcHnsw() {
	indexName := "vector_routing_hnsw_test"
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 2,
				"knn":              true,
				"knn_routing":      true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{"vector1"},
			},
			"properties": map[string]interface{}{
				"vector1": map[string]interface{}{
					"type":      "knn_vector",
					"dimension": 3,
					"data_type": "float",
					"method": map[string]interface{}{
						"engine":     "lvector",
						"name":       "hnsw",
						"space_type": "l2",
						"parameters": map[string]interface{}{
							"m":               24,
							"ef_construction": 500,
						},
					},
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
			},
		},
	}

	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

创建sparse_hnsw routing稀疏向量索引

func createUgcHnswSparse() {
	indexName := "vector_routing_sparse_test"
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 2,
				"knn":              true,
				"knn_routing":      true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{"vector1"},
			},
			"properties": map[string]interface{}{
				"vector1": map[string]interface{}{
					"type":      "knn_vector",
					"data_type": "sparse_vector",
					"method": map[string]interface{}{
						"engine":     "lvector",
						"name":       "sparse_hnsw",
						"space_type": "innerproduct",
						"parameters": map[string]interface{}{
							"m":               24,
							"ef_construction": 200,
						},
					},
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
			},
		},
	}

	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

创建ivfpq routing索引

func createUgcIvfpq() {
	indexName := "vector_routing_ivfpq_test"
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 4,
				"knn":              true,
				"knn_routing":      true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{"vector1"},
			},
			"properties": map[string]interface{}{
				"vector1": map[string]interface{}{
					"type":      "knn_vector",
					"dimension": 3,
					"data_type": "float",
					"meta": map[string]interface{}{
						"offline.construction": "true",
					},
					"method": map[string]interface{}{
						"engine":     "lvector",
						"name":       "ivfpq",
						"space_type": "cosinesimil",
						"parameters": map[string]interface{}{
							"m":                           3, // 设置为与维度dimension相同的值
							"nlist":                       2,
							"centroids_use_hnsw":          false,
							"centroids_hnsw_m":            48,
							"centroids_hnsw_ef_construct": 500,
							"centroids_hnsw_ef_search":    200,
						},
					},
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
			},
		},
	}

	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

支持纯向量数据查询和融合查询

如果您需要执行融合查询,创建索引时需指定全文检索字段,即创建索引时添加以下参数:

func createUgcHybridSearch() {
	indexName := "vector1_routing_hnsw_hybirdSearch"
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 2,
				"knn":              true,
				"knn_routing":      true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{"vector1"},
			},
			"properties": map[string]interface{}{
				"vector1": map[string]interface{}{
					"type":      "knn_vector",
					"dimension": 3,
					"data_type": "float",
					"method": map[string]interface{}{
						"engine":     "lvector",
						"name":       "hnsw",
						"space_type": "l2",
						"parameters": map[string]interface{}{
							"m":               24,
							"ef_construction": 500,
						},
					},
				},
				"text_field": map[string]interface{}{
					"type":     "text",
					"analyzer": "ik_max_word",
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
			},
		},
	}
	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

数据批量_bulk写入

通用写入

以写入索引vector_routing_ivfpq_test为例:

// 代码样例是随机生成dim维度的向量,实际业务替换
// 生成格式 [0.7862035,0.9371029,0.50112325]
func randomDenseVector(dim int) string {
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	randomArray := make([]float32, dim)
	for i := range randomArray {
		randomArray[i] = random.Float32()
	}
	var formattedArray []string
	for _, num := range randomArray {
		formattedArray = append(formattedArray, fmt.Sprintf("%.8f", num))
	}
	result := "[" + strings.Join(formattedArray, ", ") + "]"
	return result
}

func bulkWriteUgcDenseVector() {
	indexName := "vector_routing_ivfpq_test"

	for i := 0; i < 500; i++ {
		var buf bytes.Buffer

		// 每次循环写入500条数据
		for j := 0; j < 500; j++ {
			// 计算全局唯一的ID
			id := i*500 + j + 1

			// 使用ugc的索引,务必要指定 routing 字段
			meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d", "routing": "%d" } }`, indexName, id, id%5))

			// 生成数据
			data := map[string]interface{}{
				"vector1": randomDenseVector(3),
				"field1":  i + 1,
			}
			dataJson, err := json.Marshal(data)
			if err != nil {
				fmt.Println("Error marshaling JSON:", err)
				continue
			}

			// 将操作写入缓冲区
			buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
			buf.Write(meta)
			buf.WriteByte('\n') // 换行
			buf.Write(dataJson)
			buf.WriteByte('\n') // 换行
		}

		// 执行 Bulk 请求
		bulkReq := opensearchapi.BulkRequest{
			Body:  &buf,
			Index: indexName,
		}

		// 设置超时
		ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
		defer cancel()

		res, err := bulkReq.Do(ctx, client)
		if err != nil {
			fmt.Println("Happened err", err)
		} else {
			fmt.Printf("Bulk insert completed for batch %d\n", i+1)
		}
		if res != nil && res.Body != nil {
			res.Body.Close()
		}
	}
}

稀疏向量写入

以写入索引vector_routing_sparse_test为例:

type Data struct {
	Indices []int     `json:"indices"`
	Values  []float32 `json:"values"`
}

/*
* 格式参考curl使用文档,稀疏向量的格式说明
{"indices":[91,30,92],"values":[0.7862035,0.9371029,0.50112325]}
*/
func randomSparseVector() string {
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	sparsedim := random.Intn(100) + 150

	indices := []int{}
	values := []float32{}
	indicesSet := make(map[int]bool)

	for len(indices) < sparsedim {
		index := random.Intn(100000)
		if !indicesSet[index] {
			indices = append(indices, index)
			indicesSet[index] = true
			// 添加与索引对应的随机值
			values = append(values, random.Float32())
		}
	}

	// 创建数据结构
	data := Data{
		Indices: indices,
		Values:  values,
	}
	// 将 data 编码为 JSON
	jsonData, err := json.Marshal(data)
	if err != nil {
		fmt.Println("Error marshaling to JSON:", err)
		return ""
	}
	// 打印生成的 JSON 字符串
	return string(jsonData)
}

func bulkWriteUgcSparseVector() {
	indexName := "vector_routing_sparse_test"

	for i := 0; i < 500; i++ {
		var buf bytes.Buffer

		// 每次循环写入500条数据
		for j := 0; j < 500; j++ {
			// 计算全局唯一的ID
			id := i*500 + j + 1

			// 使用ugc的索引,务必要指定 routing 字段
			meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d", "routing": "%d" } }`, indexName, id, id%5))

			// 生成数据
			data := map[string]interface{}{
				"vector1": randomSparseVector(),
				"field1":  i + 1,
			}			dataJson, err := json.Marshal(data)
			if err != nil {
				fmt.Println("Error marshaling JSON:", err)
				continue
			}

			// 将操作写入缓冲区
			buf.Grow(len(meta) + len(dataJson) + 2) // 预留空间
			buf.Write(meta)
			buf.WriteByte('\n') // 换行
			buf.Write(dataJson)
			buf.WriteByte('\n') // 换行
		}

		// 执行 Bulk 请求
		bulkReq := opensearchapi.BulkRequest{
			Body:  &buf,
			Index: indexName,
		}

		// 设置超时
		ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
		defer cancel()

		res, err := bulkReq.Do(ctx, client)
		if err != nil {
			fmt.Println("Happened err", err)
		} else {
			fmt.Printf("Bulk insert completed for batch %d\n", i+1)
		}
		if res != nil && res.Body != nil {
			res.Body.Close()
		}
	}
}

索引构建

构建ivfpq索引

ivfpq索引需手动构建,需要在构建语句中设置 "meta": {"offline.construction": "true"}, 表示离线索引。

发起构建前务必确保索引已写入足够的数据量,必须大于256条且超过nlist30倍。

func buildUgcIndex() {
	indexName := "vector_routing_ivfpq_test"
	vectorField := "vector1"
	bodyBuild := map[string]interface{}{
		"indexName":      indexName,
		"fieldName":      vectorField,
		"removeOldIndex": "true",
		"ivf_train_only": "false",
	}
	url := "/_plugins/_vector/index/build"
	buf := new(bytes.Buffer)
	if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
		fmt.Println("Encode error ", err)
		return
	}
	buildIndexRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	buildIndexRequest.Header.Set("Accept", "application/json")
	buildIndexRequest.Header.Set("Content-Type", "application/json")
	res, err := client.Transport.Perform(buildIndexRequest)
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()
	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}

	fmt.Println("Response:", response.String())
	// 定义结构体解析结果
	type BuildResponse struct {
		Payload []string `json:"payload"`
	}

	var taskRes BuildResponse
	if err := json.NewDecoder(response.Body).Decode(&taskRes); err != nil {
		fmt.Println("Decode error:", err)
		return
	}

	fmt.Println("TaskId: ", taskRes.Payload[0])
}

参数说明:

参数

是否必填

说明

ivf_train_only

  • true:使用现存数据训练码本,现存数据不生成索引。设置为true后,您需要使用_truncate清理训练数据,保留索引码本,之后再重新写入数据。新写入的数据会自动生成索引,索引生成后可执行近似检索。

  • false:训练码本,现存数据生成索引,可直接执行近似检索,无需使用_truncate清理训练数据。

    重要

    仅向量引擎3.9.24及以上版本支持设置为false

无论设置为true还是false,索引构建完成后新写入的数据均可生成索引,区别为是否对现有数据根据训练的码本生成索引数据。

清理训练数据,保留索引码本

如果将ivf_train_only设置为true,则必须执行该步骤。该操作利用现有数据训练码本,不对现有数据生成索引。

其中,reserve_codebook=true为必填项,表示保存索引码本。清理训练数据后需重新写入数据才可以执行纯向量数据查询(knn检索)。

说明

如果ivf_train_only设置为false,现存数据会根据训练的码本生成索引数据,且会保留现有的数据,您可跳过该步骤。

数据查询

纯向量数据查询

纯向量数据的查询可以通过knn结构实现。

重要
  • 查询时,需要在url上指定routing参数。

  • 在自定义路由键场景中,可以将nprobe的值设置为创建索引时设置的nlist参数的值。

flat routing索引

func knnSearchUgcFlat() {
	indexName := "vector_routing_flat_test"
	vectorField := "vector1"
	knnQuery := map[string]interface{}{
		"size": 20,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float64{2.3, 3.3, 4.4},
					"k":      20,
				},
			},
		},
	}
	content, err := json.Marshal(knnQuery)

	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用ugc时,检索必须要指定Routing字段
	search := opensearchapi.SearchRequest{
		Index:   []string{indexName},
		Body:    strings.NewReader(string(content)),
		Routing: []string{"1"},
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
	}
}

hnsw routing索引

func knnSearchUgcHnsw() {
	indexName := "vector_routing_hnsw_test"
	vectorField := "vector1"
	knnQuery := map[string]interface{}{
		"size": 20,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float64{2.3, 3.3, 4.4},
					"k":      20,
				},
			},
		},
	}
	content, err := json.Marshal(knnQuery)

	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用ugc时,检索必须要指定Routing字段
	search := opensearchapi.SearchRequest{
		Index:   []string{indexName},
		Body:    strings.NewReader(string(content)),
		Routing: []string{"1"},
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
	}
}

sparse_hsnw routing稀疏向量索引

func knnSearchUgcHnswSparse() {
	indexName := "vector_routing_sparse_test"
	vectorField := "vector1"
	knnQuery := map[string]interface{}{
		"size": 10,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": map[string]interface{}{
						"indices": []int{10, 45, 16},
						"values":  []float64{0.5, 0.5, 0.2},
					},
					"k": 10,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"ef_search": "100",
			},
		},
	}
	content, err := json.Marshal(knnQuery)

	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用ugc时,检索必须要指定Routing字段
	search := opensearchapi.SearchRequest{
		Index:   []string{indexName},
		Body:    strings.NewReader(string(content)),
		Routing: []string{"1"},
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
	}
}

ivfpq routing索引

func knnSearchUgcIvfpq() {
	indexName := "vector_routing_ivfpq_test"
	vectorField := "vector1"
	knnQuery := map[string]interface{}{
		"size": 10,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float32{2.2, 2.3, 2.4},
					"k":      10,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"nprobe":          "2",
				"reorder_factor":  "2",
				"client_refactor": "true",
			},
		},
	}
	content, err := json.Marshal(knnQuery)

	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用ugc时,检索必须要指定Routing字段
	search := opensearchapi.SearchRequest{
		Index:   []string{indexName},
		Body:    strings.NewReader(string(content)),
		Routing: []string{"1"},
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
	}
}

融合查询

使用融合查询前请确保您的索引已指定全文检索字段

全文向量混合检索

func ugcTextVectorHybridSearch() {
	indexName := "vector1_routing_hnsw_hybirdSearch"
	vectorField := "vector1"
	knnQuery := map[string]interface{}{
		"size":    10,
		"_source": false,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float64{2.8, 2.3, 2.4},
					"filter": map[string]interface{}{
						"bool": map[string]interface{}{
							"must": []map[string]interface{}{
								{
									"bool": map[string]interface{}{
										"must": []map[string]interface{}{
											{
												"match": map[string]interface{}{
													"text_field": map[string]interface{}{
														"query": "test1 test2",
													},
												},
											},
											{
												"term": map[string]interface{}{
													"_routing": "替换为链接地址中指定的routing值,如1、user123",
												},
											},
										},
									},
								},
							},
						},
					},
					"k": 10,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"hybrid_search_type":    "filter_rrf",
				"rrf_rank_constant":     "60",
				"rrf_knn_weight_factor": "0.5",
			},
		},
	}
	content, err := json.Marshal(knnQuery)

	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用ugc时,检索必须要指定Routing字段
	search := opensearchapi.SearchRequest{
		Index:   []string{indexName},
		Body:    strings.NewReader(string(content)),
		Routing: []string{"1"},
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
	}
}

向量+全文融合+属性过滤

func ugcTextVectorFilterHybridSearch() {
	indexName := "vector1_routing_hnsw_hybirdSearch"
	vectorField := "vector1"
	knnQuery := map[string]interface{}{
		"size":    10,
		"_source": false,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float64{2.8, 2.3, 2.4},
					"filter": map[string]interface{}{
						"bool": map[string]interface{}{
							"must": []map[string]interface{}{
								{
									"bool": map[string]interface{}{
										"must": []map[string]interface{}{
											{
												"match": map[string]interface{}{
													"text_field": map[string]interface{}{
														"query": "test1 test2",
													},
												},
											},
											{
												"term": map[string]interface{}{
													"_routing": "替换为连接语句中指定的routing值,如1、user123",
												},
											},
										},
									},
								},
								{
									"bool": map[string]interface{}{
										"filter": []map[string]interface{}{
											{
												"range": map[string]interface{}{
													"field1": map[string]interface{}{
														"gt": 2,
													},
												},
											},
										},
									},
								},
							},
						},
					},
					"k": 10,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"hybrid_search_type":    "filter_rrf",
				"rrf_rank_constant":     "60",
				"rrf_knn_weight_factor": "0.5",
				"filter_type":           "efficient_filter",
			},
		},
	}

	content, err := json.Marshal(knnQuery)

	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用ugc时,检索必须要指定Routing字段
	search := opensearchapi.SearchRequest{
		Index:   []string{indexName},
		Body:    strings.NewReader(string(content)),
		Routing: []string{"1"},
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f \n", hit.ID, hit.Score)
	}

}