Java API Client(8.x)

本文基于Java API Client 8.x版本,为您介绍Elasticsearch Java API的用法。

背景信息

Elasticsearch在7.17版本之前使用的Java客户端是Java REST Client,从7.17版本开始Elastic官方将Java REST Client标记为弃用(deprecated),并推荐使用新版Java客户端Java API Client。

Java API Client简介

Java API Client是一个用于与Elasticsearch服务器进行通信的Java客户端库,帮助开发人员与Elasticsearch服务器进行通信,开发人员可以更加轻松地开发和维护代码。

Java API Client主要包含三个部分:

  1. ElasticsearchClient类:Java API Client的核心类,提供与Elasticsearch服务器进行通信的方法。该类封装了底层的Transport通信,并提供了同步和异步调用、流式和函数式调用等方法。

  2. JSON object mapper:处理数据序列化和反序列化的库。JSON object mapper与Jackson无缝集成,可以将Java对象映射到JSON格式。

  3. 通用能力:提供了连接池、重试、JSON序列化等通用能力,提高了代码的可读性和可维护性,便于开发人员进行开发。

准备工作

  • 安装Java,要求JDK版本为1.8及以上。安装方法请参见安装JDK

  • 创建阿里云Elasticsearch实例,版本需大于或等于Java API Client的版本。本文创建一个8.x版本的实例,创建方法请参见创建阿里云Elasticsearch实例

    说明

    为了保证最大程度地使用新版客户端的特性,推荐Java API Client版本与集群版本一致。

  • 开启阿里云Elasticsearch实例的自动创建索引功能,具体操作请参见配置YML参数。如果未开启会提示如下报错。报错

  • 配置阿里云Elasticsearch实例的白名单,确保网络互通。

    • 如果运行Java代码的服务器在公网环境下,可通过阿里云Elasticsearch实例的公网地址进行连通。连通前,需要开启阿里云Elasticsearch实例的公网地址,并修改公网地址访问白名单,将服务器的公网IP地址加入白名单中。具体操作步骤请参见配置实例公网或私网访问白名单

      重要
      • 如果您的客户端处在家庭网络或公司局域网中,您需要将局域网的公网出口IP地址添加到白名单中,而非客户端机器的内网机制。建议您通过浏览器访问cip.cc获取您当前使用的公网IP地址。

      • 您也可以将白名单配置为0.0.0.0/0,允许所有IPv4地址访问阿里云Elasticsearch实例。此配置会导致实例完全暴露在公网中,增加安全风险,配置前请确认您是否可以接受这个风险。

      • 如果未配置白名单或白名单配置错误,系统会提示连接超时报错(Timeout connecting)。

      • 如果您需要通过客户端访问Kibana节点,还需要配置Kibana的访问白名单,详细信息请参见配置Kibana公网或私网访问白名单

    • 如果运行Java代码的服务器与阿里云Elasticsearch实例在同一专有网络VPC(Virtual Private Cloud)中,可通过阿里云Elasticsearch实例的私网地址进行连通。连通前,需要确保VPC私网访问白名单(默认为0.0.0.0/0)中已添加了服务器的内网IP地址。

客户端和ES服务连接配置

Elasticsearch初始化了三种客户端:

  • 低级客户端。

        // Create the low-level client
        RestClient restClient = RestClient.builder(
            new HttpHost("localhost", 9200,"http")).build();
  • 通信Transport,并利用JacksonJsonpMapper做数据的解析。

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper());
  • 阻塞的Java客户端。

        // And create the API client
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport);
        System.out.println("elasticsearchClient = " + elasticsearchClient);

pom依赖

使用时,您需要将pom依赖中的8.x版本号替换为具体的版本号。

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.x</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.20.0</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.20.0</version>
</dependency>
重要

Log4j可能存在远程代码执行漏洞,详细信息请参见漏洞公告 | Apache Log4j2远程代码执行漏洞

示例

以下示例代码中带{}的参数需要替换为具体业务的参数。

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.*;


import java.io.IOException;

public class RestClientTest {
    public static void main(String[] args) {

        // 阿里云Elasticsearch集群需要basic auth验证。
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

        //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("{访问用户名}", "{访问密码}"));
        // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
        // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
        RestClient restClient = RestClient.builder(new HttpHost("{ES集群地址}", 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                }).build();

        // 使用 Jackson 映射器创建传输
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());

        // 创建 API 客户端
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport);
        //System.out.println("elasticsearchClient = " + elasticsearchClient);


        // 创建、删除索引和查看所有索引信息
        // 创建了一个名为 "foo" 的别名,并将其设置为写入别名 (write index),这意味着所有写入操作(例如索引、更新、删除文档)将自动路由到这个别名所对应的索引上。
        try {
            //创建索引
            CreateIndexResponse indexRequest = elasticsearchClient.indices().create(createIndexBuilder -> createIndexBuilder
                    .index("{index_name}")
                    .aliases("{foo}", aliasBuilder -> aliasBuilder
                            .isWriteIndex(true)
                    )
                );
            //检查“indexRequest”请求的操作是否已被Elasticsearch集群确认
            boolean acknowledged = indexRequest.acknowledged();
            System.out.println("Index document successfully! " + acknowledged);

            //删除索引
            DeleteIndexResponse deleteResponse = elasticsearchClient.indices().delete(createIndexBuilder -> createIndexBuilder
                    .index("{index_name}")
            );
            System.out.println("Delete document successfully! \n" + deleteResponse.toString());

            //查看所有索引信息(health status index uuid pri rep)
            IndicesResponse indicesResponse = elasticsearchClient.cat().indices();
            indicesResponse.valueBody().forEach(info -> System.out.println(info.health() + "\t"+  info.status() + "\t" + info.index() + "\t" + info.uuid() +"\t" + info.pri() + "\t" + info.rep()));


            transport.close();
            restClient.close();
        } catch (IOException ioException) {
            // 异常处理。
        }
    }
}

高并发场景需要增加客户端连接数,具体配置如下:

httpClientBuilder.setMaxConnTotal(500); 
httpClientBuilder.setMaxConnPerRoute(300);

连接代码示例:

String host = "localhost";
int port = 9200;
String username = "elastic";
String password = "passwd";
final int max_conn_total = 500;
final int max_conn_per_route = 300;

    RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        httpClientBuilder.setMaxConnTotal(max_conn_total);
                        httpClientBuilder.setMaxConnPerRoute(max_conn_per_route);
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                }).build();

更多Java API Client的使用特性,请参见Java API Client官方文档