Java Low Level REST Client是Elasticsearch官方提供的低级别REST客户端,其API不负责数据的编码与解码。Lindorm向量引擎支持向量数据检索功能,兼容Elasticsearch协议,同时支持标量、向量、全文混合检索功能。如果您想要自定义请求和响应处理方式,可以通过Java Low Level REST Client访问向量引擎。
前提条件
准备工作
安装Java Low Level REST Client
以Maven项目为例,在pom.xml
文件的dependencies
中添加依赖项。示例代码如下:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version>
</dependency>
连接搜索引擎
//Lindorm搜索引擎的Elasticsearch兼容地址
String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
int search_port = 30070;
// 配置用户名密码
String username = "user";
String password = "test";
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(search_url, search_port));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
参数说明
参数 | 说明 |
search_url | Lindorm搜索引擎的Elasticsearch兼容地址。如何获取,请参见查看连接地址。 重要
|
search_port | Lindorm搜索引擎Elasticsearch兼容的端口,固定为30070。 |
username | 访问搜索引擎的用户名和密码。 默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。 |
password |
创建向量索引
hnsw类型索引
以创建索引vector_test
为例:
String indexName = "vector_test";
// 创建索引
Request indexRequest = new Request("PUT", "/" + indexName);
indexRequest.setJsonEntity("{\n" +
" \"settings\" : {\n" +
" \"index\": {\n" +
" \"number_of_shards\": 2,\n" +
" \"knn\": true\n" +
" }\n" +
" },\n" +
" \"mappings\": {\n" +
" \"_source\": {\n" +
" \"excludes\": [\"vector1\"]\n" +
" },\n" +
" \"properties\": {\n" +
" \"vector1\": {\n" +
" \"type\": \"knn_vector\",\n" +
" \"dimension\": 3,\n" +
" \"data_type\": \"float\",\n" +
" \"method\": {\n" +
" \"engine\": \"lvector\",\n" +
" \"name\": \"hnsw\", \n" +
" \"space_type\": \"l2\",\n" +
" \"parameters\": {\n" +
" \"m\": 24,\n" +
" \"ef_construction\": 500\n" +
" }\n" +
" }\n" +
" },\n" +
" \"field1\": {\n" +
" \"type\": \"long\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}");
Response response = restClient.performRequest(indexRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("responseBody = " + responseBody);
ivfpq类型索引
以创建索引vector_ivfpq_test
为例:
String indexName = "vector_ivfpq_test";
Request indexRequest = new Request("PUT", "/" + indexName);
int dim = 3;
String createIndexJson = "{\n" +
" \"settings\": {\n" +
" \"index\": {\n" +
" \"number_of_shards\": 4,\n" +
" \"knn\": true,\n" +
" \"knn.offline.construction\": true\n" +
" }\n" +
" },\n" +
" \"mappings\": {\n" +
" \"_source\": {\n" +
" \"excludes\": [\"vector1\"]\n" +
" },\n" +
" \"properties\": {\n" +
" \"vector1\": {\n" +
" \"type\": \"knn_vector\",\n" +
" \"dimension\": %d,\n" +
" \"data_type\": \"float\",\n" +
" \"method\": {\n" +
" \"engine\": \"lvector\",\n" +
" \"name\": \"ivfpq\",\n" +
" \"space_type\": \"cosinesimil\",\n" +
" \"parameters\": {\n" +
" \"m\": %d,\n" +
" \"nlist\": 10000,\n" +
" \"centroids_use_hnsw\": true,\n" +
" \"centroids_hnsw_m\": 48,\n" +
" \"centroids_hnsw_ef_construct\": 500,\n" +
" \"centroids_hnsw_ef_search\": 200\n" +
" }\n" +
" }\n" +
" },\n" +
" \"field1\": {\n" +
" \"type\": \"long\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}"
createIndexJson = String.format(createIndexJson, dim, dim);
indexRequest.setJsonEntity(createIndexJson);
Response response = restClient.performRequest(indexRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("responseBody = " + responseBody);
稀疏向量索引
以创建索引vector_sparse_test
为例:
String indexName = "vector_sparse_test";
// 创建索引
Request indexRequest = new Request("PUT", "/" + indexName);
indexRequest.setJsonEntity("{\n" +
" \"settings\" : {\n" +
" \"index\": {\n" +
" \"number_of_shards\": 2,\n" +
" \"knn\": true\n" +
" }\n" +
" },\n" +
" \"mappings\": {\n" +
" \"_source\": {\n" +
" \"excludes\": [\"vector1\"]\n" +
" },\n" +
" \"properties\": {\n" +
" \"vector1\": {\n" +
" \"type\": \"knn_vector\",\n" +
" \"data_type\": \"sparse_vector\",\n" +
" \"method\": {\n" +
" \"engine\": \"lvector\",\n" +
" \"name\": \"sparse_hnsw\",\n" +
" \"space_type\": \"innerproduct\",\n" +
" \"parameters\": {\n" +
" \"m\": 24,\n" +
" \"ef_construction\": 200\n" +
" }\n" +
" }\n" +
" },\n" +
" \"field1\": {\n" +
" \"type\": \"long\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}");
Response response = restClient.performRequest(indexRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("responseBody = " + responseBody);
数据写入
包含向量列的索引的数据写入方式与普通索引的数据写入方式一致。
单条写入
以写入索引vector_test
为例:
String indexName = "vector_test";
String documentId = "1";
String jsonString = "{ \"field1\": 1, \"vector1\": [1.2, 1.3, 1.4] }";
Request request = new Request(
"PUT", // 指定了文档ID时使用PUT方法
"/" + indexName + "/_doc/" + documentId);
request.setJsonEntity(jsonString);
response = restClient.performRequest(request);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("writeDoc responseBody = " + responseBody);
批量写入
// 批量写入数据
Random random = new Random();
Request bulkRequest = new Request("POST", "/_bulk");
StringBuilder bulkJsonBuilder = new StringBuilder();
for (int i = 2; i < 10; i++) {
// 请将field和value替换为实际业务字段与值
bulkJsonBuilder.append("{\"index\":{\"_index\":\"").append(indexName).append("\",\"_id\":\"").append(i).append("\"}}").append("\n");
String value = String.valueOf(random.nextInt());
float[] floatArray = {random.nextFloat(), random.nextFloat(), random.nextFloat()};
String floatArrayString = Arrays.toString(floatArray);
System.out.println(i + " " + value + " " + floatArrayString);
bulkJsonBuilder.append("{\"field1\":\"").append(value).append("\",\"vector1\":\"").append(floatArrayString).append("\"}").append("\n");
}
bulkRequest.setJsonEntity(bulkJsonBuilder.toString());
response = restClient.performRequest(bulkRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("bulkWriteDoc responseBody = " + responseBody);
// 发送刷新请求,强制已写数据可见
response = restClient.performRequest(new Request("POST", "/" + indexName + "/_refresh"));
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("responseBody = " + responseBody);
稀疏向量写入
写入方式与上述方式相同,但需要修改vector1的格式。
// 写入单条数据
String documentId = "1";
String jsonString = "{ \"field1\": 1, \"vector1\": {\"indices\": [10, 12, 16], \"values\": [1.2, 1.3, 1.4]} }";
Request request = new Request(
"PUT", // 指定了文档ID时使用PUT方法
"/" + indexName + "/_doc/" + documentId);
request.setJsonEntity(jsonString);
response = restClient.performRequest(request);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("writeDoc responseBody = " + responseBody);
索引构建
除ivfpq索引,其他类型索引创建时index.knn.offline.construction默认为
false
,即在线索引,无需手动构建。在触发ivfpq索引构建前需注意:在创建ivfpq索引时,需将index.knn.offline.construction显式指定为
true
,且在发起构建时务必确保已写入足够的数据量,必须大于256条且超过nlist的30倍。手动触发索引构建完成后,后续可正常写入和查询,无需再次构建索引。
触发构建
以构建索引vector_ivfpq_test
为例:
// 构建索引
Request buildIndexRequest = new Request("POST", "/_plugins/_vector/index/build");
String jsonString = "{ \"indexName\": \"vector_ivfpq_test\", \"fieldName\": \"vector1\", \"removeOldIndex\": \"true\" }";
response = restClient.performRequest(buildIndexRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("buildIndex responseBody = " + responseBody);
参数说明
参数 | 是否必填 | 说明 |
indexName | 是 | 表名称,例如 |
fieldName | 是 | 针对哪个字段构建索引,例如 |
removeOldIndex | 是 | 构建索引时,是否删除旧的索引。取值如下:
|
返回结果如下:
{
"payload": ["default_vector_ivfpq_test_vector1"]
}
返回结果为索引构建生成的taskId
。
查看索引状态
// 查看索引状态
Request buildIndexRequest = new Request("GET", "/_plugins/_vector/index/tasks");
String jsonString = "{ \"indexName\": \"vector_ivfpq_test\", \"fieldName\": \"vector1\", \"taskIds\": \"[default_vector_ivfpq_test_vector1]\" }";
buildIndexRequest.setJsonEntity(jsonString);
Response response = restClient.performRequest(buildIndexRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("queryBuildIndex responseBody = " + responseBody);
其中,taskIds为触发构建时生成的taskId
,可以填写空的数组,例如\"taskIds\": \"[]\"
,效果与上述已填写taskIds的效果一致。
返回结果如下:
{
"payload": ["task: default_vector_ivfpq_test_vector1, stage: FINISH, innerTasks: xxx, info: finish building"]
}
其中,stage表示构建状态,共包含以下几种状态:START(开始构建)、TRAIN(训练阶段)、BUILDING(构建中)、ABORT(终止构建)、FINISH(构建完成)和FAIL(构建失败)。
ABORT通常调用/index/abort接口来终止索引构建。
终止构建
终止索引的构建流程。状态为FINISH
的索引不支持调用该方法。
// 终止构建索引
Request buildIndexRequest = new Request("POST", "/_plugins/_vector/index/tasks/abort");
String jsonString = "{ \"indexName\": \"vector_ivfpq_test\", \"fieldName\": \"vector1\", \"taskIds\": \"[\"default_vector_ivfpq_test_vector1\"]\" }";
buildIndexRequest.setJsonEntity(jsonString);
Response response = restClient.performRequest(buildIndexRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("abortBuildIndex responseBody = " + responseBody);
返回结果如下:
{
"payload":["Task: default_vector_ivfpq_test_vector1 remove success"]
}
数据查询
纯向量数据查询
纯向量数据的查询可以通过knn结构实现。
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
jsonString = "{"
+ "\"size\": 10,"
+ "\"query\": {"
+ "\"knn\": {"
+ "\"vector1\": {"
+ "\"vector\": [2.2, 2.3, 2.4],"
+ "\"k\": 10"
+ "}"
+ "}"
+ "},"
+ "\"ext\": {\"lvector\": {\"min_score\": \"0.1\"}}"
+ "}";
searchRequest.setJsonEntity(jsonString);
response = restClient.performRequest(searchRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
参数说明
参数结构 | 参数 | 是否必填 | 说明 |
knn | vector | 是 | 查询时使用的向量。 |
k | 是 | 返回最相似的K个数据。 重要 在纯向量检索场景中,建议将size和k设置为相同的值。 | |
ext | lvector.min_score | 否 | 相似度阈值,要求返回的向量得分大于该值。返回的向量得分范围为[0,1]。 取值范围:[0,+inf]。默认值为 |
lvector.filter_type | 否 | 融合查询使用的模式。取值如下:
默认值为空。 | |
lvector.ef_search | 否 | HNSW算法中,索引构建时动态列表的长度。只能用于HNSW算法。 取值范围:[1,1000]。默认值为 | |
lvector.nprobe | 否 | 要查询的聚类单元(cluster units)的数量。请根据您的召回率要求,对该参数的值进行调整已达到理想效果。值越大,召回率越高,搜索性能越低。 取值范围:[1,method.parameters.nlist]。无默认值。 重要 仅适用于ivfpq算法。 | |
lvector.reorder_factor | 否 | 使用原始向量创建重排序(reorder)。ivfpq算法计算的距离为量化后的距离,会有一定的精度损失,需要使用原始向量进行重排序。比例为 取值范围:[1,200]。默认值为 重要
|
以hnsw索引vector_test
为例,返回结果如下:
返回指定字段
如果需要在查询时返回指定字段,可以指定 "_source": ["field1", "field2"]
或使用"_source": true
返回非向量的全部字段。以查询索引vector_test
为例,使用方法如下:
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
jsonString = "{"
+ "\"size\": 10,"
+ "\"_source\": [\"field1\"],"
+ "\"query\": {"
+ "\"knn\": {"
+ "\"vector1\": {"
+ "\"vector\": [2.2, 2.3, 2.4],"
+ "\"k\": 10"
+ "}"
+ "}"
+ "},"
+ "\"ext\": {\"lvector\": {\"min_score\": \"0.1\"}}"
+ "}";
searchRequest.setJsonEntity(jsonString);
response = restClient.performRequest(searchRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
返回结果如下:
hsnw算法查询
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
jsonString = "{"
+ "\"size\": 10,"
+ "\"query\": {"
+ "\"knn\": {"
+ "\"vector1\": {"
+ "\"vector\": [2.2, 2.3, 2.4],"
+ "\"k\": 10"
+ "}"
+ "}"
+ "},"
+ "\"ext\": {\"lvector\": {\"ef_search\": \"100\"}}"
+ "}";
searchRequest.setJsonEntity(jsonString);
response = restClient.performRequest(searchRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
ivfpq算法查询
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
jsonString = "{"
+ "\"size\": 10,"
+ "\"query\": {"
+ "\"knn\": {"
+ "\"vector1\": {"
+ "\"vector\": [2.2, 2.3, 2.4],"
+ "\"k\": 10"
+ "}"
+ "}"
+ "},"
+ "\"ext\": {\"lvector\": {\"nprobe\": \"60\", \"reorder_factor\": \"2\"}}"
+ "}";
searchRequest.setJsonEntity(jsonString);
response = restClient.performRequest(searchRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
如果k值相对较大,如大于100,将reorder_factor的值设置为
1
即可。当nlist的值为
10000
时,可以先将nprobe设置为60
,查看检索效果。如果想继续提升召回率,可适当增加nprobe的值,如80、100、120、140、160,该值引起的性能损耗远小于reorder_factor,但也不适宜设置过大。
稀疏向量查询
查询方式与上述方式相同,但需要修改vector1的格式。
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
jsonString = "{"
+ "\"size\": 10,"
+ "\"query\": {"
+ "\"knn\": {"
+ "\"vector1\": {"
+ "\"vector\": {\"indices\": [10, 45, 16], \"values\": [0.5, 0.5, 0.2]},"
+ "\"k\": 10"
+ "}"
+ "}"
+ "}"
+ "}";
searchRequest.setJsonEntity(jsonString);
response = restClient.performRequest(searchRequest);
responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
融合查询
向量列的查询可与普通列的查询条件结合,并返回综合的查询结果。在实际业务使用时, Post_Filter近似查询通常能获取更相似的检索结果。
Pre-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为pre_filter
,可实现先过滤结构化数据,再查询向量数据。
目前结构化过滤数据的上限为10,000条。
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
String jsonString = jsonString = "{"
+ "\"size\": 10,"
+ "\"query\": {"
+ " \"knn\": {"
+ " \"vector1\": {"
+ " \"vector\": [2.2, 2.3, 2.4],"
+ " \"filter\": {"
+ " \"range\": {"
+ " \"field1\": {"
+ " \"gte\": 0"
+ " }"
+ " }"
+ " },"
+ " \"k\": 10"
+ " }"
+ " }"
+ "},"
+ "\"ext\": {\"lvector\": {\"filter_type\": \"pre_filter\"}}"
+ "}";
searchRequest.setJsonEntity(jsonString);
Response response = restClient.performRequest(searchRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
Post-Filter近似查询
通过在knn查询结构内部添加过滤器filter,并指定filter_type参数的值为post_filter
,可实现先查询向量数据,再过滤结构化数据。
在使用Post_Filter近似查询时,可以适当将k的值设置大一些,以便获取更多的向量数据再进行过滤。
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
String jsonString = "{\n" +
" \"size\": 10,\n" +
" \"query\": {\n" +
" \"knn\": {\n" +
" \"vector1\": {\n" +
" \"vector\": [2.2, 2.3, 2.4],\n" +
" \"filter\": {\n" +
" \"range\": {\n" +
" \"field1\": {\n" +
" \"gte\": 0\n" +
" }\n" +
" }\n" +
" },\n" +
" \"k\": 1000\n" +
" }\n" +
" }\n" +
" },\n" +
" \"ext\": {\n" +
" \"lvector\": {\n" +
" \"filter_type\": \"post_filter\"\n" +
" }\n" +
" }\n" +
"}";
searchRequest.setJsonEntity(jsonString);
Response response = restClient.performRequest(searchRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
在使用Post_Filter近似查询时需要适当放大k的值,如果使用ivfpq算法,还需要调整reorder_factor的值。具体使用如下:
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
String jsonString = "{\n" +
" \"size\": 10,\n" +
" \"query\": {\n" +
" \"knn\": {\n" +
" \"vector1\": {\n" +
" \"vector\": [2.2, 2.3, 2.4],\n" +
" \"filter\": {\n" +
" \"range\": {\n" +
" \"field1\": {\n" +
" \"gte\": 0\n" +
" }\n" +
" }\n" +
" },\n" +
" \"k\": 1000\n" +
" }\n" +
" }\n" +
" },\n" +
" \"ext\": {\n" +
" \"lvector\": {\n" +
" \"filter_type\": \"post_filter\",\n" +
" \"nprobe\": \"60\",\n" +
" \"reorder_factor\": \"1\"\n" +
" }\n" +
" }\n" +
"}";
searchRequest.setJsonEntity(jsonString);
Response response = restClient.performRequest(searchRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
在Post_Filter近似查询场景中,可以将k值放大至10,000、最大控制在20,000之内,从而将处理时延控制在百毫秒之内。如果k值相对较大,将reorder_factor的值设置为
1
即可。当nlist的值为
10000
时,可以先将nprobe设置为60,查看检索效果。如果检索效果不理想,可适当增加nprobe的值,如80、100、120、140、160,该值引起的性能损耗远小于reorder_factor,但也不宜设置过大。
您也可以通过post_filter添加过滤条件,实现Post-Filter近似查询。
// knn查询
Request searchRequest = new Request("GET", "/" + indexName + "/_search");
String jsonString ="{\n" +
" \"size\": 10,\n" +
" \"query\": {\n" +
" \"knn\": {\n" +
" \"vector1\": {\n" +
" \"vector\": [2.2, 2.3, 2.4],\n" +
" \"k\": 10\n" +
" }\n" +
" }\n" +
" },\n" +
" \"post_filter\": {\n" +
" \"range\": {\n" +
" \"field1\": {\n" +
" \"gte\": 0\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
searchRequest.setJsonEntity(jsonString);
Response response = restClient.performRequest(searchRequest);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("search responseBody = " + responseBody);
常规用法
查询所有索引及其数据量。
Request request = new Request("GET", "/_cat/indices?v"); Response response = restClient.performRequest(request); String responseBody = EntityUtils.toString(response.getEntity()); System.out.println(responseBody);
返回结果如下:
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open vector_test vector_test 2 0 2 0 6.8kb 6.8kb
查询指定索引的数据量。
Request request = new Request("GET", "/" + indexName + "/_count"); Response response = restClient.performRequest(request); String responseBody = EntityUtils.toString(response.getEntity()); System.out.println(responseBody);
返回结果如下:
{ "count" : 2, "_shards" : { "total" : 2, "successful" : 2, "skipped" : 0, "failed" : 0 } }
查看索引创建信息。
Request request = new Request("GET", "/" + indexName); Response response = restClient.performRequest(request); String responseBody = EntityUtils.toString(response.getEntity()); System.out.println(responseBody);
返回结果如下:
删除整个索引。
Request deleteIndexRequest = new Request("DELETE", "/" + indexName); Response response = restClient.performRequest(deleteIndexRequest); String responseBody = EntityUtils.toString(response.getEntity()); System.out.println("delIndex responseBody = " + responseBody);
通过查询删除。
request = new Request("POST", "/" + indexName + "/_delete_by_query"); jsonString = "{\n" + " \"query\": {\n" + " \"term\": {\n" + " \"field1\": 1\n" + " }\n" + " }\n" + "}"; request.setJsonEntity(jsonString); response = restClient.performRequest(searchRequest); responseBody = EntityUtils.toString(response.getEntity()); System.out.println("deleteByQuery responseBody = " + responseBody);