全文向量混合检索

全文向量混合检索结合了全文检索和纯向量检索,相较于单纯的全文检索或向量检索,其检索结果通常更加精确,相似度也更高。本文介绍如何使用Lindorm向量引擎的全文向量混合检索功能。

前提条件

  • 已安装Go环境,建议安装Go 1.17及以上版本。

  • 已开通Lindorm向量引擎。如何开通,请参见开通向量引擎

  • 已开通Lindorm搜索引擎。具体操作,请参见开通指南

  • 已将客户端的IP地址加入到Lindorm实例的白名单中。具体操作,请参见设置白名单

准备工作

在使用高级特性前,您需要先安装OpenSearch Go客户端连接搜索引擎

全文+向量双路召回(RRF融合检索)

在一些查询场景中,您需要综合考虑全文索引和向量索引的排序,根据一定的打分规则对各自返回的结果进一步进行加权计算,并得到最终的排名。

创建索引

以下示例使用hsnw算法。

重要

如果使用ivfpq算法,需要先将knn.offline.construction设置为true,导入离线数据后发起索引构建,构建成功后方可进行查询,详细说明请参见创建向量索引索引构建

// 创建混合索引
func createTextVectorHybridSearchIndex() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	/**
	下方使用hnsw算法时,需设置"meta": {"offline.construction": "false"},代表在线索引,
	如果使用ivfpq算法,需设置"meta": {"offline.construction": "true"}写入一定量数据后再发起索引构建
	*/
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 2,
				"knn": true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{vectorField},
			},
			"properties": map[string]interface{}{
				vectorField: map[string]interface{}{
					"type": "knn_vector",
					"dimension": 3,
					"data_type": "float",
					"meta": map[string]interface{}{
						"offline.construction": "false",
					},
					"method": map[string]interface{}{
						"engine": "lvector",
						"name": "hnsw",
						"space_type": "l2",
						"parameters": map[string]interface{}{
							"m": 24,
							"ef_construction": 500,
						},
					},
				},
				textField: map[string]interface{}{
					"type": "text",
					"analyzer": "ik_max_word",
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
				"field2": map[string]interface{}{
					"type": "keyword",
				},
			},
		},
	}

	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

数据写入

func bulkWriteTextVectorHybridIndex() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"

	var buf bytes.Buffer
	type Data struct {
		Field1    int
		Field2    string
		Vector1   []float64
		TextField string
	}

	data := []Data{
		{Field1: 1, Field2: "flag1", Vector1: []float64{2.5, 2.3, 2.4}, TextField: "hello test5"},
		{Field1: 2, Field2: "flag1", Vector1: []float64{2.6, 2.3, 2.4}, TextField: "hello test6 test5"},
		{Field1: 3, Field2: "flag1", Vector1: []float64{2.7, 2.3, 2.4}, TextField: "hello test7"},
		{Field1: 4, Field2: "flag2", Vector1: []float64{2.8, 2.3, 2.4}, TextField: "hello test8 test7"},
		{Field1: 5, Field2: "flag2", Vector1: []float64{2.9, 2.3, 2.4}, TextField: "hello test9"},
	}
	for i, item := range data {
		// id 可以根据业务自己指定
		meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i+1))
		data := map[string]interface{}{
			"field1": item.Field1,
			"field2": item.Field2,
			vectorField: item.Vector1,
			textField: item.TextField,
		}
		dataJson, err := json.Marshal(data)
		if err != nil {
			fmt.Println("Error marshaling JSON:", err)
			continue
		}
		buf.Grow(len(meta) + len(dataJson) + 2)
		buf.Write(meta)
		buf.WriteByte('\n')
		buf.Write(dataJson)
		buf.WriteByte('\n')
	}

	bulkReq := opensearchapi.BulkRequest{
		Body:  &buf,
		Index: indexName,
	}

	// 设置超时
	ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
	defer cancel()

	res, err := bulkReq.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("happend err, %s", err)
	}
	if res.IsError() {
		handlerCommonError(res)
	}
}

数据查询(融合查询)

RRF计算方式如下:

进行查询时系统会根据传入的rrf_rank_constant参数,对全文检索和向量检索分别获得的topK结果进行处理。对于每个返回的文档_id,使用公式1/(rrf_rank_constant + rank(i))计算得分,其中rank(i)表示该文档在结果中的排名。

如果某个文档_id同时出现在全文检索和向量检索的topK结果中,其最终得分为两种检索方法计算得分之和。而仅出现在其中一种检索结果中的文档,则只保留该检索方法的得分。

rrf_rank_constant = 1为例,计算结果如下:

# doc   | queryA     | queryB         | score
_id: 1 =  1.0/(1+1)  + 0              = 0.5
_id: 2 =  1.0/(1+2)  + 0              = 0.33
_id: 4 =    0        + 1.0/(1+2)      = 0.33
_id: 5 =    0        + 1.0/(1+1)      = 0.5

支持通过_search接口或_msearch_rrf接口进行融合查询,两种接口的对比如下:

接口

开源性

易读性

是否支持全文、向量检索比例调整

_search

兼容

不易读

支持

_msearch_rrf

自研接口

易读

不支持

以下是两种场景下使用_search接口或_msearch_rrf接口的具体写法:

无标量字段过滤的场景

使用开源_search接口

优点:兼容开源_search接口,支持通过rrf_knn_weight_factor参数调整全文检索与纯向量检索的比例。

缺点:写法较为复杂。

ext.lvector扩展字段中,不设置filter_type,则表示该RRF检索只包含全文检索和纯向量检索,同时向量检索中无需进行标量字段的过滤。

func hybridSearchNoFilterType() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	vector := []float64{2.8, 2.3, 2.4} // Example vector
	filterText := "test5 test6 test7 test8 test9"

	query := map[string]interface{}{
		"size":    10,
		"_source": true,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": vector,
					"filter": map[string]interface{}{
						"match": map[string]interface{}{
							textField: filterText,
						},
					},
					"k": 10,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"hybrid_search_type":    "filter_rrf",
				"rrf_rank_constant":     "1",
				"rrf_knn_weight_factor": "0.5",
				"ef_search":             "100",
			},
		},
	}

	// Convert the map to JSON
	body, err := json.Marshal(query)
	if err != nil {
		fmt.Printf("Error marshaling query to JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  bytes.NewReader(body),
	}

	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Printf("Knn err: %s\n", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}

	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Printf("Knn result Decode err: %s\n", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

使用自研_msearch_rrf接口

优点:写法较清晰。

缺点:不兼容开源_search接口,不支持调整全文检索与纯向量检索的比例。

func getDenseVectorQuery(indexName string, vectorField string, denseVector string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size": topK,
		"_source": []string{
			"field1",
			"field2",
		},
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": denseVector,
					"k":      topK,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"ef_search": "100",
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func getTextQuery(indexName string, textField string, queryText string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size": topK,
		"_source": []string{
			"field1",
			"field2",
		},
		"query": map[string]interface{}{
			"match": map[string]interface{}{
				textField: queryText,
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func hybridSearchNoFilterTypeMSearchRRF() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	buf := new(bytes.Buffer)
	query, _ := getDenseVectorQuery(indexName, vectorField, "[2.8, 2.3, 2.4]", 10)
	query2, _ := getTextQuery(indexName, textField, "test5 test6 test7 test8 test9", 10)
	buf.Write(query.Bytes())
	buf.Write(query2.Bytes())

	url := "/_msearch_rrf?re_score=true&rrf_rank_constant=1"
	mSearchRRFRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	mSearchRRFRequest.Header.Set("Accept", "application/json")
	mSearchRRFRequest.Header.Set("Content-Type", "application/json")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	res, err := client.Transport.Perform(mSearchRRFRequest.WithContext(ctx))
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()

	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}

	var responseBody KnnResponse
	if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
		fmt.Printf("Knn result Decode err: %s\n", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

包含标量字段过滤场景

使用开源_search接口

ext.lvector扩展字段中设置filter_type参数,则表示该RRF检索中的向量检索还需进行标量字段的过滤。

说明

RRF融合检索时,如果希望携带filter过滤条件,需要将全文检索的query条件和用于过滤的filter条件分别设置到两个bool表达式中,通过bool.must进行连接。must中的第一个bool表达式将用于全文检索,计算全文匹配度得分。must中第二个bool filter表达式将用于knn检索的过滤条件。

func hybridSearchWithFilterType() {
	indexName := "vector_text_hybridSearch"
	size := 10
	vectorField := "vector1"
	textField := "text_field"
	// "_source": 指定为true,返回所有字段;指定列表返回指定字段;指定为false只返回_id与_score
	knnQuery := map[string]interface{}{
		"size": 10,
		"_source": []string{
			"field1",
			"field2",
		},
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float64{2.8, 2.3, 2.4},
					"filter": map[string]interface{}{
						"bool": map[string]interface{}{
							"must": []interface{}{
								map[string]interface{}{
									"bool": map[string]interface{}{
										"must": []interface{}{
											map[string]interface{}{
												"match": map[string]interface{}{
													textField: map[string]interface{}{
														"query": "test5 test6 test7 test8 test9",
													},
												},
											},
										},
									},
								},
								map[string]interface{}{
									"bool": map[string]interface{}{
										"filter": []interface{}{
											map[string]interface{}{
												"range": map[string]interface{}{
													"field1": map[string]interface{}{
														"gt": 2,
													},
												},
											},
											map[string]interface{}{
												"term": map[string]interface{}{
													"field2": "flag2",
												},
											},
										},
									},
								},
							},
						},
					},
					"k": size,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"hybrid_search_type":    "filter_rrf",
				"filter_type":           "efficient_filter",
				"rrf_rank_constant":     "1",
				"rrf_knn_weight_factor": "0.5",
				"ef_search":             "100",
				"k_expand_scope":        "1000",
			},
		},
	}

	content, err := json.Marshal(knnQuery)
	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:   bytes.NewReader(content),
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
	}
}

使用自研_msearch_rrf接口

func getDenseVectorQueryWithFilter(indexName string, vectorField string, denseVector string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size":    topK,
		"_source": []interface{}{"field1", "field2"},
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": denseVector,
					"filter": map[string]interface{}{
						"bool": map[string]interface{}{
							"filter": []interface{}{
								map[string]interface{}{
									"range": map[string]interface{}{
										"field1": map[string]interface{}{
											"gt": 2,
										},
									},
								},
								map[string]interface{}{
									"term": map[string]interface{}{
										"field2": "flag2",
									},
								},
							},
						},
					},
					"k": topK,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"filter_type":    "efficient_filter",
				"ef_search":      "100",
				"k_expand_scope": "2000",
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func getTextQueryWithFilter(indexName string, textField string, queryText string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size":    topK,
		"_source": []string{"field1", "field2"},
		"query": map[string]interface{}{
			"bool": map[string]interface{}{
				"must": []interface{}{
					map[string]interface{}{
						"match": map[string]interface{}{
							textField: queryText,
						},
					},
				},
				"filter": map[string]interface{}{
					"bool": map[string]interface{}{
						"filter": []interface{}{
							map[string]interface{}{
								"range": map[string]interface{}{
									"field1": map[string]interface{}{
										"gt": 2,
									},
								},
							},
							map[string]interface{}{
								"term": map[string]interface{}{
									"field2": "flag2",
								},
							},
						},
					},
				},
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func hybridSearchNoFilterTypeMSearchRRFWithFilter() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	buf := new(bytes.Buffer)
	query, _ := getDenseVectorQueryWithFilter(indexName, vectorField, "[2.8, 2.3, 2.4]", 10)
	query2, _ := getTextQueryWithFilter(indexName, textField, "test5 test6 test7 test8 test9", 10)
	buf.Write(query.Bytes())
	buf.Write(query2.Bytes())

	url := "/_msearch_rrf?re_score=true&rrf_rank_constant=1"
	mSearchRRFRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	mSearchRRFRequest.Header.Set("Accept", "application/json")
	mSearchRRFRequest.Header.Set("Content-Type", "application/json")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	res, err := client.Transport.Perform(mSearchRRFRequest.WithContext(ctx))
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()

	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}

	var responseBody KnnResponse
	if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
		fmt.Printf("Knn result Decode err: %s\n", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}