Lindorm向量引擎支持向量数据检索功能,同时支持标量、向量、全文混合检索功能。Java High Level REST Client是高级别REST客户端,支持更简单易用的API。如果您想要进行复杂查询分析,可以通过Java Low Level REST Client访问向量引擎。
前提条件
准备工作
安装Java High Level REST Client
以Maven项目为例,在pom.xml
文件的dependencies
中添加依赖项。示例代码如下:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.20.0</version>
</dependency>
连接搜索引擎
//Lindorm搜索引擎的Elasticsearch兼容地址
String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
int search_port = 30070;
String username = "user";
String password = "test";
final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestHighLevelClient highClient = new RestHighLevelClient(
RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
}
})
);
参数 | 说明 |
search_url | 搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址。 重要
|
search_port | Lindorm搜索引擎Elasticsearch兼容的端口,固定为30070。 |
username | 访问搜索引擎的用户名和密码。 默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。 |
password |
创建向量索引
示例中Map.of语法要求JDK 9及以上版本,如果您的JDK低于该版本,可以根据实际情况修改为适配您JDK版本的语法。
hnsw类型索引
String indexName = "vector_hnsw_test";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
{
// Create settings
Map<String, Object> indexMap = new HashMap<>();
indexMap.put("number_of_shards", 2);
indexMap.put("knn", true);
Map<String, Object> settings = new HashMap<>();
settings.put("index", indexMap);
createIndexRequest.settings(settings);
}
{
// 创建mappings
Map<String, Object> source = new HashMap<>();
source.put("excludes", new String[] { "vector1" });
// Create a map for "method.parameters"
Map<String, Object> parameters = new HashMap<>();
parameters.put("m", 24);
parameters.put("ef_construction", 500);
// Create a map for "method"
Map<String, Object> method = new HashMap<>();
method.put("engine", "lvector");
method.put("name", "hnsw");
method.put("space_type", "l2");
method.put("parameters", parameters);
// Create a map for "vector1"
Map<String, Object> vector1 = new HashMap<>();
vector1.put("type", "knn_vector");
vector1.put("dimension", 3);
vector1.put("data_type", "float");
vector1.put("method", method);
// Create a map for the properties
Map<String, Object> properties = new HashMap<>();
properties.put("vector1", vector1);
properties.put("field1", Map.of("type", "long"));
// Create the mappings map
Map<String, Object> mappings = new HashMap<>();
mappings.put("_source", source);
mappings.put("properties", properties);
createIndexRequest.mapping(mappings);
}
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
System.out.println("createIndexResponse: " + createIndexResponse);
ivfpq类型索引
String indexName = "vector_ivfpq_test";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
{
// Create settings
Map<String, Object> indexMap = new HashMap<>();
indexMap.put("number_of_shards", 2);
indexMap.put("knn", true);
indexMap.put("knn.offline.construction", true);
Map<String, Object> settings = new HashMap<>();
settings.put("index", indexMap);
createIndexRequest.settings(settings);
}
{
// Create mappings
Map<String, Object> source = new HashMap<>();
Map<String, Object> source = new HashMap<>();
source.put("excludes", new String[] { "vector1" });
mappings.put("_source", source);
Map<String, Object> properties = new HashMap<>();
int dim = 3;
// Vector1 property
Map<String, Object> vector1 = new HashMap<>();
vector1.put("type", "knn_vector");
vector1.put("dimension", dim);
vector1.put("data_type", "float");
Map<String, Object> method = new HashMap<>();
method.put("engine", "lvector");
method.put("name", "ivfpq");
method.put("space_type", "cosinesimil");
Map<String, Object> parameters = new HashMap<>();
parameters.put("m", dim);
parameters.put("nlist", 10000);
parameters.put("centroids_use_hnsw", true);
parameters.put("centroids_hnsw_m", 48);
parameters.put("centroids_hnsw_ef_construct", 500);
parameters.put("centroids_hnsw_ef_search", 200);
method.put("parameters", parameters);
vector1.put("method", method);
properties.put("vector1", vector1);
// Field1 property
Map<String, Object> field1 = new HashMap<>();
field1.put("type", "long");
properties.put("field1", field1);
mappings.put("properties", properties);
createIndexRequest.mapping(mappings);
}
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
System.out.println("createIndexResponse: " + createIndexResponse);
稀疏向量索引
String indexName = "vector_sparse_test";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
{
// Create settings
Map<String, Object> indexMap = new HashMap<>();
indexMap.put("number_of_shards", 2);
indexMap.put("knn", true);
Map<String, Object> settings = new HashMap<>();
settings.put("index", indexMap);
createIndexRequest.settings(settings);
}
{
// Create mappings
Map<String, Object> source = new HashMap<>();
Map<String, Object> source = new HashMap<>();
source.put("excludes", new String[]{"vector1"});
mappings.put("_source", source);
Map<String, Object> properties = new HashMap<>();
Map<String, Object> vector1 = new HashMap<>();
vector1.put("type", "knn_vector");
vector1.put("data_type", "sparse_vector");
Map<String, Object> method = new HashMap<>();
method.put("engine", "lvector");
method.put("name", "sparse_hnsw");
method.put("space_type", "innerproduct");
Map<String, Object> parameters = new HashMap<>();
parameters.put("m", 24);
parameters.put("ef_construction", 200);
method.put("parameters", parameters);
vector1.put("method", method);
properties.put("vector1", vector1);
Map<String, Object> field1 = new HashMap<>();
field1.put("type", "long");
properties.put("field1", field1);
mappings.put("properties", properties);
createIndexRequest.mapping(mappings);
}
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
System.out.println("createIndexResponse: " + createIndexResponse);
数据写入
包含向量列的索引的数据写入方式与普通索引的数据写入方式一致。
单条写入
以写入索引vector_test
为例:
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("field1", 1);
fieldMap.put("vector1", new float[]{1.2f,1.3f,1.4f});
IndexRequest indexRequest = new IndexRequest("vector_test");
indexRequest.id("1");
indexRequest.source(fieldMap);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
批量写入
BulkRequest bulkRequest = new BulkRequest();
{
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("field1", 1);
fieldMap.put("vector1", new float[]{2.2f,2.3f,2.4f});
IndexRequest indexRequest = new IndexRequest("vector_test");
indexRequest.id("1");
indexRequest.source(fieldMap);
bulkRequest.add(indexRequest);
}
{
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("field1", 1);
fieldMap.put("vector1", new float[]{2.2f,2.3f,2.4f});
IndexRequest indexRequest = new IndexRequest("vector_test");
indexRequest.id("2");
indexRequest.source(fieldMap);
bulkRequest.add(indexRequest);
}
{
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("field1", 2);
fieldMap.put("vector1", new float[]{1.2f,1.3f,4.4f});
IndexRequest indexRequest = new IndexRequest("vector_test");
indexRequest.id("3");
indexRequest.source(fieldMap);
bulkRequest.add(indexRequest);
}
{
DeleteRequest deleteRequest = new DeleteRequest("vector_test", "2");
bulkRequest.add(deleteRequest);
}
{
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("field1", 3);
fieldMap.put("vector1", new float[]{2.2f,3.3f,4.4f});
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("vector_test");
updateRequest.id("1");
updateRequest.doc(fieldMap);
bulkRequest.add(updateRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
稀疏向量写入
写入方式与上述方式相同,但需要修改vector1的格式。
Map<String, Object> vectorMap = new HashMap<>();
vectorMap.put("indices", Lists.of(10, 12, 16));
vectorMap.put("values", Lists.of(1.2, 1.3, 1.4));
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("field1", 1);
fieldMap.put("vector1", vectorMap);
IndexRequest indexRequest = new IndexRequest("vector_test");
indexRequest.id("1");
indexRequest.source(fieldMap);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
索引构建
Java High Level REST Client并未直接支持所有的自定义插件接口,您需要通过Low Level REST客户端调用_plugins
接口 。具体操作,请参见索引构建。
数据查询
纯向量数据查询
float[] vectors = new float[]{2.2f,3.3f,4.4f};
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Map<String, Object> queryBody = new HashMap<>();
{
Map<String, Object> vectorMap = new HashMap<>();
vectorMap.put("vector", vectors);
vectorMap.put("k", topK);
queryBody.put("knn", Map.of(vectorColumn, vectorMap));
}
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("min_score", "0.8");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(index);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
参数说明
参数结构 | 参数 | 是否必填 | 说明 |
knn | vector | 是 | 查询时使用的向量。 |
k | 是 | 返回最相似的K个数据。 | |
ext | lvector.min_score | 否 | 相似度阈值,要求返回的向量得分大于该值。返回的向量得分范围为[0,1]。 取值范围:[0,+inf]。默认值为 |
lvector.filter_type | 否 | 融合查询使用的模式。取值如下:
默认值为空。 | |
lvector.ef_search | 否 | HNSW算法中,索引构建时动态列表的长度。只能用于HNSW算法。 取值范围:[1,1000]。默认值为 | |
lvector.nprobe | 否 | 要查询的聚类单元(cluster units)的数量。请根据您的召回率要求,对该参数的值进行调整已达到理想效果。值越大,召回率越高,搜索性能越低。 取值范围:[1,method.parameters.nlist]。无默认值。 重要 仅适用于ivfpq算法。 | |
lvector.reorder_factor | 否 | 使用原始向量创建重排序(reorder)。ivfpq算法计算的距离为量化后的距离,会有一定的精度损失,需要使用原始向量进行重排序。比例为 取值范围:[1,200]。默认值为 重要
|
返回指定字段
如果需要在查询时返回指定字段,可以指定 "_source": ["field1", "field2"]
或使用"_source": true
返回非向量的全部字段。
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Map<String, Object> queryBody = new HashMap<>();
{
Map<String, Object> vectorMap = new HashMap<>();
vectorMap.put("vector", vectors);
vectorMap.put("k", topK);
queryBody.put("knn", Map.of(vectorColumn, vectorMap));
}
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("min_score", "0.8");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
// _source 可以配置返回值带属性
searchSourceBuilder.fetchSource(new FetchSourceContext(true, new String[]{"field1", "field2"}, null));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(index);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
hsnw算法查询
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Map<String, Object> queryBody = new HashMap<>();
{
Map<String, Object> vectorMap = new HashMap<>();
vectorMap.put("vector", vectors);
vectorMap.put("k", topK);
queryBody.put("knn", Map.of(vectorColumn, vectorMap));
}
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("ef_search", "100");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
// _source 可以配置返回值带属性
searchSourceBuilder.fetchSource(new FetchSourceContext(true, new String[]{"field1", "field2"}, null));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(index);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
ivfpq算法查询
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Map<String, Object> queryBody = new HashMap<>();
{
Map<String, Object> vectorMap = new HashMap<>();
vectorMap.put("vector", vectors);
vectorMap.put("k", topK);
queryBody.put("knn", Map.of(vectorColumn, vectorMap));
}
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("nprobe", "60", "reorder_factor", "5");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
// _source 可以配置返回值带属性
searchSourceBuilder.fetchSource(new FetchSourceContext(true, new String[]{"field1", "field2"}, null));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(index);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
如果k值相对较大,如大于100,将reorder_factor的值设置为
1
即可。当nlist的值为
10000
时,可以先将nprobe设置为60
,查看检索效果。如果想继续提升召回率,可适当增加nprobe的值,如80、100、120、140、160,该值引起的性能损耗远小于reorder_factor,但也不适宜设置过大。
稀疏向量查询
查询方式与上述方式相同,但需要修改vector1的格式。
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Map<String, Object> queryBody = new HashMap<>();
{
Map<String, Object> knn = new HashMap<>();
Map<String, Object> vector1 = new HashMap<>();
vector1.put("vector", new HashMap<String, Object>() {{
put("indices", Arrays.asList(10, 45, 16));
put("values", Arrays.asList(0.5, 0.5, 0.2));
}});
vector1.put("k", 10);
knn.put("vector1", vector1);
queryBody.put("knn", knn);
}
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(index);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
融合查询
向量列的查询可与普通列的查询条件结合,并返回综合的查询结果。在实际业务使用时,Post_Filter近似查询通常能获取更相似的检索结果。
Pre-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为pre_filter
,可实现先过滤结构化数据,再查询向量数据。
目前结构化过滤数据的上限为10,000条。
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// vectorColumn 是向量列列名
// field1 是属性列列名
Map<String, Object> queryBody = Map.of("knn", Map.of(vectorColumn, Map.of(
"vector", vectors,
"k", topK,
"filter", Map.of("range", Map.of("filed1", Map.of("gte", 0))))));
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("filter_type", "pre_filter");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
searchRequest.source(searchSourceBuilder);
searchRequest.indices("vector_test");
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
Post-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为post_filter
,可实现先查询向量数据,再过滤结构化数据。
在使用Post_Filter近似查询时,可以适当将k的值设置大一些,以便获取更多的向量数据再进行过滤。
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// vectorColumn 是向量列列名
// field1 是属性列列名
Map<String, Object> queryBody = Map.of("knn", Map.of(vectorColumn, Map.of(
"vector", vectors,
"k", topK,
"filter", Map.of("range", Map.of("filed1", Map.of("gte", 0))))));
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("filter_type", "post_filter");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
searchRequest.source(searchSourceBuilder);
searchRequest.indices("vector_test");
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
在使用Post_Filter近似查询时需要适当放大k的值,如果使用ivfpq算法,还需要调整reorder_factor的值。具体使用如下:
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// vectorColumn 是向量列列名
// field1是属性列列名
Map<String, Object> queryBody = Map.of("knn", Map.of(vectorColumn, Map.of(
"vector", vectors,
"k", topK,
"filter", Map.of("range", Map.of("filed1", Map.of("gte", 0))))));
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
Map<String, String> ext = Map.of("filter_type", "post_filter", "nprobe", "60", "reorder_factor", "1");
searchSourceBuilder.ext(Collections.singletonList(new LVectorExtBuilder("lvector", ext)));
searchRequest.source(searchSourceBuilder);
searchRequest.indices("vector_test");
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
在Post_Filter近似查询场景中,可以将k值放大至10,000、最大控制在20,000之内,从而将处理时延控制在百毫秒之内。如果k值相对较大,将reorder_factor的值设置为
1
即可。当nlist的值为
10000
时,可以先将nprobe设置为60,查看检索效果。如果检索效果不理想,可适当增加nprobe的值,如80、100、120、140、160,该值引起的性能损耗远小于reorder_factor,但也不宜设置过大。
您也可以通过post_filter添加过滤条件,实现Post-Filter近似查询。
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// vectorColumn 是向量列列名
Map<String, Object> queryBody = Map.of("knn", Map.of(vectorColumn, Map.of(
"vector", vectors,
"k", topK)));
searchSourceBuilder.query(QueryBuilders.wrapperQuery(new Gson().toJson(queryBody)));
// field1 >= 0
searchSourceBuilder.postFilter(QueryBuilders.rangeQuery("field1").gte(0));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(index);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
常规用法
查询所有索引及其数据量。
Request request = new Request("GET", "/_cat/indices?v"); Response response = restClient.performRequest(request); String responseBody = EntityUtils.toString(response.getEntity()); System.out.println(responseBody);
查询指定索引的数据量。
CountRequest countRequest = new CountRequest(index); CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT); System.out.println(countResponse.getCount());
查看索引创建信息。
GetIndexRequest getIndexRequest = new GetIndexRequest(index); GetIndexResponse getIndexResponse = client.indices().get(getIndexRequest, RequestOptions.DEFAULT); getIndexResponse.getMappings().forEach((k, v) -> System.out.println(k + " " + v.getSourceAsMap())); getIndexResponse.getSettings().forEach((k, v) -> System.out.println(k + " " + v));
删除整个索引。
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index); AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); System.out.println("deleteIndexResponse: " + deleteIndexResponse.isAcknowledged());
通过查询删除。
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(index); deleteByQueryRequest.setQuery(QueryBuilders.termQuery("field1", "1")); BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); System.out.println(bulkByScrollResponse.getTotal());