自动Embedding技术通过内置预训练模型,将文本自动转化为向量,消除了传统方案中手动定义向量字段的繁琐流程。本文介绍基于Go语言,如何在Lindorm系统中通过ElasticSearch Go客户端实现自动Embedding数据的写入与查询方法。
前提条件
准备工作
安装ElasticSearch Go客户端
您需要安装ElasticSearch Go客户端,支持以下两种方式:
方式一:
修改
go.mod
文件配置,添加相关依赖,具体如下:module EsGoClient go 1.23.0 //请替换为您的Go版本 require ( github.com/elastic/go-elasticsearch/v7 v7.10.0 github.com/google/uuid v1.6.0 // 非必须,示例代码中生成uuid需要该项,请根据实际情况配置 )
执行以下命令,更新
go.mod
文件。go mod tidy
方式二:执行以下命令直接下载。
go get github.com/elastic/go-elasticsearch/v7@v7.10.0
连接搜索引擎
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"log"
"net/http"
"strings"
"time"
)
var client *elasticsearch.Client
func init() {
cfg := elasticsearch.Config{
Addresses: []string{
"http://ld-t4n5668xk31ui****-proxy-search-pub.lindorm.aliyuncs.com:30070",
},
Username: "<Username>",
Password: "<Password>",
Transport: &http.Transport{
MaxIdleConnsPerHost: 20,
},
}
var err error
client, err = elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("Init client error %s", err)
}
}
func handlerCommonError(res *esapi.Response) {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
fmt.Printf("Error parsing the error response: %s\n", err)
return
}
fmt.Printf("ERROR %v\n", errorResponse)
}
type Source struct {
Field1 int `json:"field1"`
Field2 string `json:"field2"`
Brand string `json:"brand"`
Merit string `json:"merit"`
Tag []string `json:"tag"`
}
type KnnResponse struct {
Hits struct {
Hits []struct {
ID string `json:"_id"`
Score float64 `json:"_score"`
Source Source `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
其中Addresses、Username和Password分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看连接地址。
操作步骤概览
操作步骤 | 涉及引擎 | 说明 |
操作步骤 | 涉及引擎 | 说明 |
AI引擎 | 通过curl命令调用AI引擎RESTful API,部署Embedding模型BGE-M3,用于将文本数据转换为向量。 | |
搜索引擎 | 在搜索引擎中创建写入Pipeline,用于在写入数据时,自动将文本数据转换为向量数据(Embedding)。 | |
搜索引擎 | 在搜索引擎中创建查询Pipeline,用于在查询数据时,自动将文本数据转化为向量数据。 | |
向量引擎,搜索引擎 | 在创建或修改向量索引时,需指定写入和查询Pipeline,用于将写入与查询数据自动转换为向量数据。 | |
向量引擎,搜索引擎 | 使用指定的写入Pipeline,将写入的文本数据自动转化为向量数据。 | |
向量引擎,搜索引擎 | 使用指定的查询Pipeline,将查询的文本数据自动转化为向量数据。 |
AI引擎部署Embedding模型
AI引擎部署模型的具体操作请参见模型管理和通过curl命令使用AI引擎RESTful API示例。
部署BGE-M3模型示例如下,参数详情请参见模型管理。
curl请求地址URL使用AI引擎的专用网络连接地址。
curl -i -k --location --header 'x-ld-ak:<username>' --header 'x-ld-sk:<password>' -X POST http://<URL>/v1/ai/models/create -H "Content-Type: application/json" -d '{
"model_name": "bge_m3_model",
"model_path": "huggingface://BAAI/bge-m3",
"task": "FEATURE_EXTRACTION",
"algorithm": "BGE_M3",
"settings": {"instance_count": "2"}
}'
搜索引擎创建Pipeline
在搜索引擎中创建两种Pipeline,分别用于实现数据写入和查询的自动向量化处理。
创建写入Pipeline
func createWritePipeline() {
password := "test****" //AI引擎的密码
wirtePipelineID := "write_embedding_pipeline"
url := "http://ld-2zeyhrpnv9n7a****-proxy-ai-vpc.lindorm.aliyuncs.com:9002" //AI引擎的专有网络连接地址
writePipeline := map[string]interface{}{
"description": "demo_chunking pipeline",
"processors": []interface{}{
map[string]interface{}{
"text-embedding": map[string]interface{}{
"inputFields": []interface{}{"text_field"},
"outputFields": []interface{}{"text_field_embedding"},
"userName": "root", //AI引擎的用户名
"password": "test****", //AI引擎的密码
"url": "http://ld-2zeyhrpnv9n7a****-proxy-ai-vpc.lindorm.aliyuncs.com:9002", // AI引擎的专有网络连接地址
"modeName": "bge_m3_model",
},
},
},
}
content, err := json.Marshal(writePipeline)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
request := esapi.IngestPutPipelineRequest{
PipelineID: wirtePipelineID,
Body: bytes.NewReader(content),
}
response, err := request.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer response.Body.Close()
if response.IsError() {
handlerCommonError(response)
return
}
fmt.Println("Response: ", response.String())
}
参数说明
参数 | 说明 |
参数 | 说明 |
processors | 对写入进行Pipeline操作。 |
text-embedding | 固定Key,必须填写。 |
inputFields | 需要进行向量化的文本字段。 |
outputFields | 向量化后的向量字段。 |
userName | Lindorm AI引擎的用户名。 |
password | Lindorm AI引擎的密码。 |
url | AI引擎的连接地址,务必使用专有网络连接地址。 |
modeName | 模型名称,本文对应 |
写入和查询Pipeline中指定的inputFields
和outputFields
,必须与创建向量索引时填写的text_field
和text_field_embedding
保持一致。
创建查询Pipeline
func createSearchPipeline() {
password := "test****" //AI引擎的密码
searchPipelineID := "knnsearch_pipeline"
url := "http://ld-2zeyhrpnv9n7a****-proxy-ai-vpc.lindorm.aliyuncs.com:9002" //AI引擎的专有网络连接地址
searchPipeline := map[string]interface{}{
"request_processors": []interface{}{
map[string]interface{}{
"text-embedding": map[string]interface{}{
"tag": "auto-query-embedding",
"description": "Auto query embedding",
"model_config": map[string]interface{}{
"inputFields": []interface{}{"text_field"},
"outputFields": []interface{}{"text_field_embedding"},
"userName": "root", //AI引擎的用户名
"password": "test****", //AI引擎的密码
"url": "http://ld-2zeyhrpnv9n7a****-proxy-ai-vpc.lindorm.aliyuncs.com:9002", //AI引擎的专有网络连接地址
"modeName": "bge_m3_model",
},
},
},
},
}
searchPielineUrl := fmt.Sprintf("_search/pipeline/%s", searchPipelineID)
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(searchPipeline); err != nil {
fmt.Println("Encode error ", err)
return
}
request, err := http.NewRequest("PUT", searchPielineUrl, buf)
if err != nil {
fmt.Println("Err ", err)
return
}
request.Header.Set("Accept", "application/json")
request.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(request)
if err != nil {
fmt.Println("Err ", err)
return
}
defer res.Body.Close()
response := esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
fmt.Println("Response:", response.String())
}
参数说明
参数 | 说明 |
参数 | 说明 |
request_processors | 表示对搜索请求进行Pipeline操作。 |
text-embedding | 固定Key,必须填写。 |
inputFields | 需要进行向量化的文本字段,起到占位作用。 |
outputFields | 向量化以后的向量字段。 |
userName | Lindorm AI引擎的用户名。 |
password | Lindorm AI引擎的密码。 |
url | AI引擎的连接地址,务必使用专有网络连接地址。 |
modeName | 模型名称,本文对应 |
写入和查询Pipeline中指定的inputFields
和outputFields
,必须与创建向量索引时填写的text_field
和text_field_embedding
保持一致。
创建索引并指定Pipeline
在创建向量索引或修改现有向量索引设置时,请指定所需的Pipeline。
创建向量索引
// 创建索引,指定写入与查询pipeline
func createAutoEmbeddingPipelineIndex() {
indexName := "search_vector_test"
writePipelineID := "write_embedding_pipeline"
searchPipelineID := "knnsearch_pipeline"
createMapping := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"knn": true,
"default_pipeline": writePipelineID,
"search.default_pipeline": searchPipelineID,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []interface{}{"text_field_embedding"},
},
"properties": map[string]interface{}{
"text_field": map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
"text_field_embedding": map[string]interface{}{
"type": "knn_vector",
"dimension": 1024,
"data_type": "float",
"method": map[string]interface{}{
"engine": "lvector",
"name": "hnsw",
"space_type": "cosinesimil",
"parameters": map[string]interface{}{
"m": 24,
"ef_construction": 500,
},
},
},
"tag": map[string]interface{}{
"type": "keyword",
},
"brand": map[string]interface{}{
"type": "keyword",
},
"merit": map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
},
},
}
body, err := json.Marshal(createMapping)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := esapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
defer res.Body.Close()
if err != nil {
fmt.Println("Create index error: ", err)
return
}
if res.IsError() {
handlerCommonError(res)
return
}
fmt.Println("Create response: ", res.String())
}
修改现有向量索引设置
如果您已经创建了向量索引,可以通过以下方式修改其配置,指定写入和查询时使用的 Pipeline,以满足特定的业务需求。
func updateIndexAddAutoEmbeddingPipeline() {
// 假设 search_vector_test2 没有指定 default_pipeline 与 search.default_pipeline
indexName := "search_vector_test2"
putSetting := map[string]interface{}{
"index": map[string]interface{}{
"default_pipeline": "write_embedding_pipeline",
"search.default_pipeline": "knnsearch_pipeline",
},
}
body, err := json.Marshal(putSetting)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := esapi.IndicesPutSettingsRequest{
Index: []string{indexName},
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
defer res.Body.Close()
if err != nil {
fmt.Println("Create index error: ", err)
return
}
if res.IsError() {
handlerCommonError(res)
return
}
fmt.Println("Put setting: ", res.String())
}
数据写入
由于指定了写入的Pipeline,因此,在写入过程中,除了将文本字段text_field
写入外,还会根据该Pipeline将text_field
编码成向量形式,并将其作为text_field_embedding
一并写入。
func bulkWriteAutoEmbeddingIndex() {
indexName := "search_vector_test"
textField := "text_field"
var buf bytes.Buffer
type Data struct {
ID string
TextField string
Tag []string
Brand string
Merit string
}
data := []Data{
{ID: "3982", TextField: "品牌A 时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)", Tag: []string{"鼠标", "电子产品"}, Brand: "品牌A", Merit: "好用、外观漂亮"},
{ID: "323519", TextField: "品牌B 光学鼠标(经典黑)(智能自动对码/1000DPI高精度光学引擎)", Tag: []string{"鼠标", "电子产品"}, Brand: "品牌B", Merit: "质量好、到货速度快、外观漂亮、好用"},
{ID: "300265", TextField: "品牌C 耳塞式耳机 白色(经典时尚)", Tag: []string{"耳机", "电子产品"}, Brand: "品牌C", Merit: "外观漂亮、质量好"},
{ID: "6797", TextField: "品牌D 两刀头充电式电动剃须刀", Tag: []string{"家用电器", "电动剃须刀"}, Brand: "品牌D", Merit: "好用、外观漂亮"},
{ID: "8195", TextField: "品牌E Class4 32G TF卡(micro SD)手机存储卡", Tag: []string{"存储设备", "存储卡", "SD卡"}, Brand: "品牌E", Merit: "容量挺大的、速度快、好用、质量好"},
{ID: "13316", TextField: "品牌E 101 G2 32GB 优盘", Tag: []string{"存储设备", "U盘", "优盘"}, Brand: "品牌E", Merit: "好用、容量挺大的、速度快"},
{ID: "14103", TextField: "品牌B 64GB至尊高速移动存储卡 UHS-1制式 读写速度最高可达30MB", Tag: []string{"存储设备", "存储卡", "SD卡"}, Brand: "品牌B", Merit: "容量挺大的、速度快、好用"},
}
for _, item := range data {
// id 可以根据业务自己指定
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%s" } }`, indexName, item.ID))
data := map[string]interface{}{
textField: item.TextField,
"tag": item.Tag,
"brand": item.Brand,
"merit": item.Merit,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
continue
}
buf.Grow(len(meta) + len(dataJson) + 2)
buf.Write(meta)
buf.WriteByte('\n')
buf.Write(dataJson)
buf.WriteByte('\n')
}
bulkReq := esapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
defer res.Body.Close()
if err != nil {
fmt.Println("happend err, %s", err)
}
if res.IsError() {
handlerCommonError(res)
}
}
数据查询
func autoEmbeddingKnnQueryWithoutFilter() {
indexName := "search_vector_test"
vectorField := "text_field_embedding"
knnQuery := map[string]interface{}{
"size": 10,
"_source": true,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"query_text": "存储卡",
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"ef_search": "200",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := esapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f, Brand %s, Tag %v, Metric: %s \n", hit.ID, hit.Score, hit.Source.Brand, hit.Source.Tag, hit.Source.Merit)
}
}
返回结果
ID: 8195, Score: 0.743359, Brand 品牌E, Tag [存储设备 存储卡 SD卡], Metric: 容量挺大的、速度快、好用、质量好
ID: 14103, Score: 0.711654, Brand 品牌B, Tag [存储设备 存储卡 SD卡], Metric: 容量挺大的、速度快、好用
ID: 13316, Score: 0.683168, Brand 品牌E, Tag [存储设备 U盘 优盘], Metric: 好用、容量挺大的、速度快
ID: 3982, Score: 0.642342, Brand 品牌A, Tag [鼠标 电子产品], Metric: 好用、外观漂亮
ID: 6797, Score: 0.635721, Brand 品牌D, Tag [家用电器 电动剃须刀], Metric: 好用、外观漂亮
ID: 323519, Score: 0.624451, Brand 品牌B, Tag [鼠标 电子产品], Metric: 质量好、到货速度快、外观漂亮、好用
ID: 300265, Score: 0.621442, Brand 品牌C, Tag [耳机 电子产品], Metric: 外观漂亮、质量好
func autoEmbeddingKnnQueryWithFilter() {
indexName := "search_vector_test"
vectorField := "text_field_embedding"
knnQuery := map[string]interface{}{
"size": 10,
"_source": true,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"query_text": "存储卡",
"k": 10,
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []interface{}{
map[string]interface{}{
"match": map[string]interface{}{
"merit": "质量好",
},
},
map[string]interface{}{
"term": map[string]interface{}{
"brand": "品牌E",
},
},
map[string]interface{}{
"terms": map[string]interface{}{
"tag": []interface{}{"SD卡", "存储卡"},
},
},
},
},
},
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"filter_type": "efficient_filter",
"ef_search": "200",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := esapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f, Brand %s, Tag %v, Metric: %s \n", hit.ID, hit.Score, hit.Source.Brand, hit.Source.Tag, hit.Source.Merit)
}
}
返回结果
ID: 8195, Score: 0.743359, Brand 品牌E, Tag [存储设备 存储卡 SD卡], Metric: 容量挺大的、速度快、好用、质量好
func autoEmbeddingHybridSearchQuery() {
indexName := "search_vector_test"
vectorField := "text_field_embedding"
knnQuery := map[string]interface{}{
"size": 10,
"_source": true,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"query_text": "存储卡",
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
map[string]interface{}{
"match": map[string]interface{}{
"text_field": map[string]interface{}{
"query": "存储卡",
},
},
},
},
},
},
map[string]interface{}{
"bool": map[string]interface{}{
"filter": []interface{}{
map[string]interface{}{
"match": map[string]interface{}{
"merit": "质量好",
},
},
map[string]interface{}{
"term": map[string]interface{}{
"brand": "品牌E",
},
},
map[string]interface{}{
"terms": map[string]interface{}{
"tag": []interface{}{"SD卡", "存储卡"},
},
},
},
},
},
},
},
},
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"filter_type": "efficient_filter",
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "1",
"ef_search": "200",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := esapi.SearchRequest{
Index: []string{indexName},
Body: strings.NewReader(string(content)),
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f, Brand %s, Tag %v, Metric: %s \n", hit.ID, hit.Score, hit.Source.Brand, hit.Source.Tag, hit.Source.Merit)
}
}
返回结果
ID: 8195, Score: 1.000000, Brand 品牌E, Tag [存储设备 存储卡 SD卡], Metric: 容量挺大的、速度快、好用、质量好
- 本页导读 (1)
- 前提条件
- 准备工作
- 安装ElasticSearch Go客户端
- 连接搜索引擎
- 操作步骤概览
- AI引擎部署Embedding模型
- 搜索引擎创建Pipeline
- 创建写入Pipeline
- 创建查询Pipeline
- 创建索引并指定Pipeline
- 创建向量索引
- 修改现有向量索引设置
- 数据写入
- 数据查询