Java High Level REST Client

Java High Level REST Client是Elasticsearch官方提供的高级别REST客户端,支持更简单易用的API。Lindorm搜索引擎兼容Elasticsearch 7.10及更早版本的功能,如果您想要进行复杂查询分析或使用Elasticsearch的一些高级特性,可以通过Java High Level REST Client连接搜索引擎,轻松实现搜索索引及索引文档的设计和管理。

前提条件

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

  • 已开通搜索引擎。如何开通,请参见开通指南

  • 已将客户端IP地址添加至Lindorm白名单,具体操作请参见设置白名单

操作步骤

  1. 安装High Level Rest Client。以Maven项目为例,在pom.xml文件的dependencies中添加依赖项。示例代码如下:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.20.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.20.0</version>
    </dependency>
    重要

    Java High Level REST Client能够向上兼容,例如6.7.0版本的Java High Level REST Client可以与6.7.0及以上版本的Elasticsearch集群进行通信。为保证最大程度地使用新版客户端的特性,推荐使用7.10.0版本或更低版本的Java High Level REST Client。

  2. 配置连接参数并使用RestClient.builder()方式创建RestHighLevelClient对象。

    //Lindorm搜索引擎的Elasticsearch兼容地址
    String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
    int search_port = 30070;
    
    String username = "user";
    String password = "test";
    final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
    credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    
    
    RestHighLevelClient highClient = new RestHighLevelClient(
      RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
          return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
        }
      })
    );

    参数说明

    参数

    说明

    search_url

    搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址

    重要
    • 如果应用部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

    • 如果应用部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址

    • 通过专有网络访问Lindorm实例,search_url请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,search_url请填写Elasticsearch兼容地址对应的公网地址。

    search_port

    Lindorm搜索引擎Elasticsearch兼容的端口,固定为30070。

    username

    访问搜索引擎的用户名和密码。

    默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。

    password

  3. 使用搜索引擎。

    示例代码分为以下几部分:

    • 创建搜索索引:创建搜索索引lindorm_index。

    • 数据写入:使用单条写入方式,将数据写入ID为test的文档。再使用批量写入方式,写入100000条文档。

    • 数据查询:刷新请求,强制写入的数据可见。执行两个请求,分别查询索引中的全部文档和ID为test的文档。

    • 数据删除:删除ID为test的文档,并删除索引lindorm_index。

    try {
      String index_name = "lindorm_index";
    
      // 构造创建索引的CreateIndex请求
      CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
      // 指定索引的Settings
      Map<String, Object> settingsMap = new HashMap<>();
      settingsMap.put("index.number_of_shards", 4);
      createIndexRequest.settings(settingsMap);
      CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
      if (createIndexResponse.isAcknowledged()) {
        System.out.println("Create index [" + index_name + "] successfully.");
      }
    
      // 文档id。若不指定会使用系统自生成的id,写入性能更优
      String doc_id = "test";
      // 文档字段。请将field和value替换为实际业务字段与值
      Map<String, Object> jsonMap = new HashMap<>();
      jsonMap.put("field1", "value1");
      jsonMap.put("field2", "value2");
    
      // 构造写入单条文档的Index请求,指定文档的id和要写入的字段
      IndexRequest indexRequest = new IndexRequest(index_name);
      indexRequest.id(doc_id).source(jsonMap);
      IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
      System.out.println("Index document with id[" + indexResponse.getId() + "] successfully.");
    
      // 使用批量写入的方式写数据
      int bulkTotal = 100000;
      AtomicLong failedBulkItemCount = new AtomicLong();
      // 创建一个BulkProcessor,用于执行Bulk请求
      BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
        new BulkProcessor.Listener() {
          @Override
          public void beforeBulk(long executionId, BulkRequest request) {}
    
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            // 可以获取到Bulk请求中每个请求的执行结果,此处统计执行失败的BulkItem请求个数
            for (BulkItemResponse bulkItemResponse : response) {
              if (bulkItemResponse.isFailed()) {
                failedBulkItemCount.incrementAndGet();
              }
            }
          }
    
          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            // 此处的失败意味着整个Bulk请求都没有被执行
            if (null != failure) {
              failedBulkItemCount.addAndGet(request.numberOfActions());
            }
          }
        });
      // 设置Bulk请求的并发数,默认为1
      builder.setConcurrentRequests(10);
      // 设置BulkProcessor发送Bulk请求的阈值,可以是:时间间隔,操作数量或者请求大小
      builder.setFlushInterval(TimeValue.timeValueSeconds(5));
      builder.setBulkActions(5000);
      builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
      BulkProcessor bulkProcessor = builder.build();
      Random random = new Random();
      for (int i = 0; i < bulkTotal; i++) {
        // 请将field和value替换为实际业务字段与值
        Map<String, Object> map = new HashMap<>();
        map.put("field1", random.nextInt() + "");
        map.put("field2", random.nextInt() + "");
        IndexRequest bulkItemRequest = new IndexRequest(index_name);
        bulkItemRequest.source(map);
        // 添加操作到BulkProcessor
        bulkProcessor.add(bulkItemRequest);
      }
      // 如果需要的话,可以使用awaitClose方法来等待所有的操作都被执行完
      bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
      long failure = failedBulkItemCount.get(),
        success = bulkTotal - failure;
      System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed.");
    
      // 构造强制已写数据可见的Refresh请求
      RefreshRequest refreshRequest = new RefreshRequest(index_name);
      RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
      System.out.println("Refresh on index [" + index_name + "] successfully.");
    
      // 构造查询所有数据的Search请求
      SearchRequest searchRequest = new SearchRequest(index_name);
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
      QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
      searchSourceBuilder.query(queryMatchAllBuiler);
      searchRequest.source(searchSourceBuilder);
      SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      long totalHit = searchResponse.getHits().getTotalHits().value;
      System.out.println("Search query match all hits [" + totalHit + "] in total.");
    
      // 构造通过id查询数据的Search请求
      QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
      searchSourceBuilder.query(queryByIdBuilder);
      searchRequest.source(searchSourceBuilder);
      searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      for (SearchHit searchHit : searchResponse.getHits()) {
        System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]");
      }
    
      // 构造删除单条文档的Delete请求,指定要删除文档的id
      DeleteRequest deleteRequest = new DeleteRequest(index_name);
      deleteRequest.id(doc_id);
      DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
      System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully.");
    
      // 构造删除整个索引的DeleteIndex请求
      DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
      AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
      if (deleteIndexResponse.isAcknowledged()) {
        System.out.println("Delete index [" + index_name + "] successfully.");
      }
    
      highClient.close();
    } catch (Exception exception) {
      // 异常处理
      System.out.println("msg " + exception);
    }

完整示例

完整示例代码如下:

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class RestHClientTest {
  private static final RequestOptions COMMON_OPTIONS;
  static {
    // 创建请求配置参数,默认缓存限制为100MB,此处修改为30MB
    RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
    builder.setHttpAsyncResponseConsumerFactory(
      new HttpAsyncResponseConsumerFactory
        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
    COMMON_OPTIONS = builder.build();
  }

  public static void main(String[] args) {
    // Lindorm搜索引擎的Elasticsearch兼容地址
    String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
    int search_port = 30070;

    // Lindorm搜索引擎的访问用户名和密码,通过控制台获取
    String username = "user";
    String password = "test";

    final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
    credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    RestHighLevelClient highClient = new RestHighLevelClient(
      RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
          return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
        }
      })
    );

    try {
      String index_name = "lindorm_index";

      // 构造创建索引的CreateIndex请求
      CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
      // 指定索引的Settings
      Map<String, Object> settingsMap = new HashMap<>();
      settingsMap.put("index.number_of_shards", 4);
      createIndexRequest.settings(settingsMap);
      CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
      if (createIndexResponse.isAcknowledged()) {
        System.out.println("Create index [" + index_name + "] successfully.");
      }

      // 文档id。若不指定会使用系统自生成的id,写入性能更优
      String doc_id = "test";
      // 文档字段。请将field和value替换为实际业务字段与值
      Map<String, Object> jsonMap = new HashMap<>();
      jsonMap.put("field1", "value1");
      jsonMap.put("field2", "value2");

      // 构造写入单条文档的Index请求,指定文档的id和要写入的字段
      IndexRequest indexRequest = new IndexRequest(index_name);
      indexRequest.id(doc_id).source(jsonMap);
      IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
      System.out.println("Index document with id[" + indexResponse.getId() + "] successfully.");

      // 使用批量写入的方式写数据
      int bulkTotal = 100000;
      AtomicLong failedBulkItemCount = new AtomicLong();
      // 创建一个BulkProcessor,用于执行Bulk请求
      BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
        new BulkProcessor.Listener() {
          @Override
          public void beforeBulk(long executionId, BulkRequest request) {}

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            // 可以获取到Bulk请求中每个请求的执行结果,此处统计执行失败的BulkItem请求个数
            for (BulkItemResponse bulkItemResponse : response) {
              if (bulkItemResponse.isFailed()) {
                failedBulkItemCount.incrementAndGet();
              }
            }
          }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            // 此处的失败意味着整个Bulk请求都没有被执行
            if (null != failure) {
              failedBulkItemCount.addAndGet(request.numberOfActions());
            }
          }
        });
      // 设置Bulk请求的并发数,默认为1
      builder.setConcurrentRequests(10);
      // 设置BulkProcessor发送Bulk请求的阈值,可以是:时间间隔,操作数量或者请求大小
      builder.setFlushInterval(TimeValue.timeValueSeconds(5));
      builder.setBulkActions(5000);
      builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
      BulkProcessor bulkProcessor = builder.build();
      Random random = new Random();
      for (int i = 0; i < bulkTotal; i++) {
        // 请将field和value替换为实际业务字段与值
        Map<String, Object> map = new HashMap<>();
        map.put("field1", random.nextInt() + "");
        map.put("field2", random.nextInt() + "");
        IndexRequest bulkItemRequest = new IndexRequest(index_name);
        bulkItemRequest.source(map);
        // 添加操作到BulkProcessor
        bulkProcessor.add(bulkItemRequest);
      }
      // 如果需要的话,可以使用awaitClose方法来等待所有的操作都被执行完
      bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
      long failure = failedBulkItemCount.get(),
        success = bulkTotal - failure;
      System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed.");

      // 构造强制已写数据可见的Refresh请求
      RefreshRequest refreshRequest = new RefreshRequest(index_name);
      RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
      System.out.println("Refresh on index [" + index_name + "] successfully.");

      // 构造查询所有数据的Search请求
      SearchRequest searchRequest = new SearchRequest(index_name);
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
      QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
      searchSourceBuilder.query(queryMatchAllBuiler);
      searchRequest.source(searchSourceBuilder);
      SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      long totalHit = searchResponse.getHits().getTotalHits().value;
      System.out.println("Search query match all hits [" + totalHit + "] in total.");

      // 构造通过id查询数据的Search请求
      QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
      searchSourceBuilder.query(queryByIdBuilder);
      searchRequest.source(searchSourceBuilder);
      searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      for (SearchHit searchHit : searchResponse.getHits()) {
        System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]");
      }

      // 构造删除单条文档的Delete请求,指定要删除文档的id
      DeleteRequest deleteRequest = new DeleteRequest(index_name);
      deleteRequest.id(doc_id);
      DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
      System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully.");

      // 构造删除整个索引的DeleteIndex请求
      DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
      AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
      if (deleteIndexResponse.isAcknowledged()) {
        System.out.println("Delete index [" + index_name + "] successfully.");
      }

      highClient.close();
    } catch (Exception exception) {
      // 异常处理
      System.out.println("msg " + exception);
    }
  }
}

执行成功后将返回如下结果:

Create index [lindorm_index] successfully.
Index document with id[test] successfully.
Bulk using BulkProcessor finished with [100000] requests succeeded, [0] requests failed.
Refresh on index [lindorm_index] successfully.
Search query match all hits [10000] in total.
Search query by id response [{"field1":"value1","field2":"value2"}]
Delete document with id [test] successfully.
Delete index [lindorm_index] successfully.
说明

从返回结果可以看到已成功写入了100,000条数据,但查询所有数据的请求响应结果显示仅匹配到10,000条数据。这是由于服务端默认情况下的hit上限为10,000,因此导致了查询结果与预期不符。

如果您希望能命中所有数据,可以将SearchSourceBuilder对象的trackTotalHits属性设置为true,即searchSourceBuilder.trackTotalHits(true);