使用客户端连接PolarSearch

PolarSearch完全兼容OpenSearch的官方客户端,您可以使用相应的客户端与PolarSearch进行交互。这使得您能够通过JavaPython等常用编程语言高效地管理索引、操作文档(包括增、删、改、查)以及执行复杂搜索,从而将搜索功能无缝集成到您的应用程序中。

准备工作

  1. 已创建含有PolarSearch节点的集群并设置了节点的管理员账号

  2. 获取连接地址:在集群的数据库节点区域,将鼠标悬浮在搜索节点,根据您的业务环境,获取PolarSearch节点的私网或公网地址。

连接集群

JAVA

1. 选择传输层并添加依赖

OpenSearch Java客户端需要搭配一个传输层框架来处理HTTP请求。您可以根据项目需求选择Apache HttpClient5RestClient。

【推荐】Apache HttpClient5

  • Maven
    pom.xml文件中添加以下依赖:

    <!-- OpenSearch Java Client核心库 -->
    <dependency>
      <groupId>org.opensearch.client</groupId>
      <artifactId>opensearch-java</artifactId>
      <version>2.19.0</version>
    </dependency>
    
    <!-- Apache HttpClient5 传输层 -->
    <dependency>
      <groupId>org.apache.httpcomponents.client5</groupId>
      <artifactId>httpclient5</artifactId>
      <version>5.2.1</version>
    </dependency>
  • Gradle
    build.gradle文件中添加以下依赖:

    dependencies {
        // OpenSearch Java Client核心库
        implementation 'org.opensearch.client:opensearch-java:2.19.0'
        
        // Apache HttpClient5 传输层
        implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
    }

RestClient

  • Maven
    pom.xml文件中添加以下依赖:

    <!-- OpenSearch Java Client核心库 -->
    <dependency>
      <groupId>org.opensearch.client</groupId>
      <artifactId>opensearch-java</artifactId>
      <version>2.19.0</version>
    </dependency>
    
    <!-- RestClient 传输层 -->
    <dependency>
      <groupId>org.opensearch.client</groupId>
      <artifactId>opensearch-rest-client</artifactId>
      <version>2.19.0</version>
    </dependency>
  • Gradle
    build.gradle文件中添加以下依赖:

    dependencies {
        // OpenSearch Java Client核心库
        implementation 'org.opensearch.client:opensearch-java:2.19.0'
    
        // RestClient 传输层
        implementation 'org.opensearch.client:opensearch-rest-client:2.19.0'
    }

2. 设置数据类

创建一个测试数据类,以供后续测试PolarSearch功能使用。

static class IndexData {
      private String title;
      private String text;
  
      public IndexData() {}
  
      public IndexData(String title, String text) {
          this.title = title;
          this.text = text;
      }
  
      public String getTitle() {
          return title;
      }
  
      public void setTitle(String title) {
          this.title = title;
      }
  
      public String getText() {
          return text;
      }
  
      public void setText(String text) {
          this.text = text;
      }
  
      @Override
      public String toString() {
          return String.format("IndexData{title='%s', text='%s'}", title, text);
      }
  }

3. 初始化客户端

根据您选择的传输层,使用PolarSearch的连接信息初始化客户端。在实际使用中,您可以根据以下代码配置将PolarSearch的相关信息设置为环境变量,或直接将其赋值为参数的默认值,以便于测试使用。

Apache HttpClient5

以下示例展示了如何初始化一个客户端,当前示例代码禁用了SSL。

import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;

public class CreateClient {
  public static void main(String[] args) throws Exception {
    var env = System.getenv();
    var hostname = env.getOrDefault("HOST", "<poalrsearch_host>");
    var port = Integer.parseInt(env.getOrDefault("PORT", "<poalrsearch_port>"));
    var user = env.getOrDefault("USERNAME", "<poalrsearch_username>");
    var pass = env.getOrDefault("PASSWORD", "<poalrsearch_password>");

    final var hosts = new HttpHost[] { new HttpHost("http", hostname, port) };

    final var sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build();

    final var transport = ApacheHttpClient5TransportBuilder.builder(hosts)
            .setMapper(new JacksonJsonpMapper())
            .setHttpClientConfigCallback(httpClientBuilder -> {
                final var credentialsProvider = new BasicCredentialsProvider();
                for (final var host : hosts) {
                    credentialsProvider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, pass.toCharArray()));
                }

                // Disable SSL/TLS verification as our local testing clusters use self-signed certificates
                final var tlsStrategy = ClientTlsStrategyBuilder.create()
                        .setSslContext(sslContext)
                        .setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                        .build();

                final var connectionManager = PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build();

                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager);
            })
            .build();

    OpenSearchClient client = new OpenSearchClient(transport);
  }
}

RestClient

以下示例展示了如何初始化一个客户端,当前示例代码禁用了SSL。

import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;

public class CreateRestClient {
  public static void main(String[] args) throws Exception {
    var env = System.getenv();
    var hostname = env.getOrDefault("HOST", "<poalrsearch_host>");
    var port = Integer.parseInt(env.getOrDefault("PORT", "<poalrsearch_port>"));
    var user = env.getOrDefault("USERNAME", "<poalrsearch_username>");
    var pass = env.getOrDefault("PASSWORD", "<poalrsearch_password>");
    
    final org.apache.http.HttpHost[] restHosts = new org.apache.http.HttpHost[] {new org.apache.http.HttpHost(hostname, port, "http")};


    RestClient restClient = RestClient.builder(restHosts)
            .setHttpClientConfigCallback(httpClientBuilder -> {
                final org.apache.http.impl.client.BasicCredentialsProvider credentialsProvider = new org.apache.http.impl.client.BasicCredentialsProvider();
                for (final org.apache.http.HttpHost ignored : restHosts) {
                    credentialsProvider.setCredentials(org.apache.http.auth.AuthScope.ANY, new org.apache.http.auth.UsernamePasswordCredentials(user, pass));
                }

                try {
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                            // disable the certificate since our testing cluster just uses the default security configuration
                            .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                            .setSSLContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).build();

    final RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
    final OpenSearchClient client = new OpenSearchClient(transport);
  }
}

4. 执行基本操作

客户端初始化后,您可以执行创建索引、索引文档、搜索和删除等操作。

创建索引

String index = "sample-index";
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build();
client.indices().create(createIndexRequest);

索引文档

IndexData indexData = new IndexData("first_name", "Bruce");
IndexRequest<IndexData> indexRequest = new IndexRequest.Builder<IndexData>().index(index).id("1").document(indexData).build();
client.index(indexRequest);

搜索文档

SearchResponse<IndexData> searchResponse = client.search(s -> s.index(index), IndexData.class);
for (int i = 0; i< searchResponse.hits().hits().size(); i++) {
  System.out.println(searchResponse.hits().hits().get(i).source());
}

删除文档

client.delete(b -> b.index(index).id("1"));

删除索引

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build();
DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest);

Python

建议您使用Low-level Python客户端,因为High-level Python客户端在OpenSearch 2.1.0版本后已被废弃。

1.配置环境

  1. 根据实际业务需求,进入指定的项目目录。此处以/home/PolarSearchTestPython为例。

    mkdir /home/PolarSearchTestPython
    cd /home/PolarSearchTestPython
  2. /home/PolarSearchTestPython目录中,创建虚拟环境(venv)隔离项目依赖,避免全局污染。

    python3 -m venv myenv
  3. 激活虚拟环境

    source myenv/bin/activate
  4. 安装所需的Python依赖库。

    pip3 install opensearch-py

2. 连接PolarSearch

在您的Python代码中导入OpenSearch类,并使用PolarSearch的连接信息创建客户端实例。在实际使用中,您可以根据以下代码配置将PolarSearch的相关信息设置为环境变量,或直接将其赋值为参数的默认值,以便于测试使用。

from opensearchpy import OpenSearch

host = os.getenv("HOST", default="<poalrsearch_host>")
port = int(os.getenv("PORT", <poalrsearch_port>))
auth = (os.getenv("USERNAME", "<poalrsearch_username>"), os.getenv("PASSWORD", "<poalrsearch_password>"))

client = OpenSearch(
    hosts=[{"host": host, "port": port}],
    http_auth=auth,
    use_ssl=False,
    verify_certs=False,
    ssl_show_warn=False,
)

3. 操作示例

以下代码片段展示了如何执行常见的索引和文档操作。

创建索引

使用client.indices.create()方法创建一个新索引。

index_name = 'python-test-index'
index_body = {
  'settings': {
    'index': {
      'number_of_shards': 4
    }
  }
}

response = client.indices.create(index=index_name, body=index_body)

索引文档

使用client.index()方法向指定索引中添加一个文档。

document = {
  'title': 'Moneyball',
  'director': 'Bennett Miller',
  'year': '2011'
}

response = client.index(
    index = 'python-test-index',
    body = document,
    id = '1',
    refresh = True
)

执行批量操作

使用client.bulk()方法可以一次性执行多个操作,操作之间用\n分隔。

movies = '{ "index" : { "_index" : "my-dsl-index", "_id" : "2" } } \n { "title" : "Interstellar", "director" : "Christopher Nolan", "year" : "2014"} \n { "create" : { "_index" : "my-dsl-index", "_id" : "3" } } \n { "title" : "Star Trek Beyond", "director" : "Justin Lin", "year" : "2015"} \n { "update" : {"_id" : "3", "_index" : "my-dsl-index" } } \n { "doc" : {"year" : "2016"} }'

client.bulk(body=movies)

搜索文档

使用client.search()方法根据查询条件搜索文档。

q = 'miller'
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['title^2', 'director']
    }
  }
}

response = client.search(
    body = query,
    index = 'python-test-index'
)

删除文档

使用client.delete()方法删除指定ID的文档。

response = client.delete(
    index = 'python-test-index',
    id = '1'
)

删除索引

使用client.indices.delete()方法删除整个索引。

response = client.indices.delete(
    index = 'python-test-index'
)

完整示例代码

下面是一个完整的示例,演示了从创建索引到最终删除索引的全过程。

JAVA

示例代码

依赖配置(pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>OpenSearchJavaClientSample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.opensearch.client</groupId>
            <artifactId>opensearch-rest-client</artifactId>
            <version>2.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.opensearch.client</groupId>
            <artifactId>opensearch-java</artifactId>
            <version>2.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents.client5</groupId>
            <artifactId>httpclient5</artifactId>
            <version>5.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents.core5</groupId>
            <artifactId>httpcore5</artifactId>
            <version>5.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.21.0</version>
        </dependency>
    </dependencies>
</project>

示例程序(OpenSearchClientExample.java)

package samples;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;

import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexResponse;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.opensearch.client.transport.rest_client.RestClientTransport;


public class OpenSearchClientExample {
    static class IndexData {
        private String title;
        private String text;
    
        public IndexData() {}
    
        public IndexData(String title, String text) {
            this.title = title;
            this.text = text;
        }
    
        public String getTitle() {
            return title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    
        public String getText() {
            return text;
        }
    
        public void setText(String text) {
            this.text = text;
        }
    
        @Override
        public String toString() {
            return String.format("IndexData{title='%s', text='%s'}", title, text);
        }
    }
    
    private static final Logger LOGGER = LogManager.getLogger(OpenSearchClientExample.class);

    public static OpenSearchClient createClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        var env = System.getenv();
        var hostname = env.getOrDefault("HOST", "localhost");
        var port = Integer.parseInt(env.getOrDefault("PORT", "9200"));
        var user = env.getOrDefault("USERNAME", "admin");
        var pass = env.getOrDefault("PASSWORD", "admin");

        final var hosts = new HttpHost[] { new HttpHost("http", hostname, port) };

        final var sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build();

        final var transport = ApacheHttpClient5TransportBuilder.builder(hosts)
                .setMapper(new JacksonJsonpMapper())
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    final var credentialsProvider = new BasicCredentialsProvider();
                    for (final var host : hosts) {
                        credentialsProvider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, pass.toCharArray()));
                    }

                    // Disable SSL/TLS verification as our local testing clusters use self-signed certificates
                    final var tlsStrategy = ClientTlsStrategyBuilder.create()
                            .setSslContext(sslContext)
                            .setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                            .build();

                    final var connectionManager = PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build();

                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager);
                })
                .build();
        return new OpenSearchClient(transport);
    }

    public static OpenSearchClient createRestClient()  {
        var env = System.getenv();
        var hostname = env.getOrDefault("HOST", "localhost");
        var port = Integer.parseInt(env.getOrDefault("PORT", "9200"));
        var user = env.getOrDefault("USERNAME", "admin");
        var pass = env.getOrDefault("PASSWORD", "admin");
        final org.apache.http.HttpHost[] restHosts = new org.apache.http.HttpHost[] {new org.apache.http.HttpHost(hostname, port, "http")};


        RestClient restClient = RestClient.builder(restHosts)
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    final org.apache.http.impl.client.BasicCredentialsProvider credentialsProvider = new org.apache.http.impl.client.BasicCredentialsProvider();
                    for (final org.apache.http.HttpHost ignored : restHosts) {
                        credentialsProvider.setCredentials(org.apache.http.auth.AuthScope.ANY, new org.apache.http.auth.UsernamePasswordCredentials(user, pass));
                    }

                    try {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                                // disable the certificate since our testing cluster just uses the default security configuration
                                .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                                .setSSLContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build());
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).build();

        final RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        return new OpenSearchClient(transport);
    }

    public static void main(String[] args) {
        try {
            LOGGER.info("start test...");

            var client = createClient();
            LOGGER.info("client create");
            final var index = "my-index";
            // 1. Creating an index
            LOGGER.info("1. Creating an index {} start", index);
            if (!client.indices().exists(r -> r.index(index)).value()) {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index)
                        .build();
                client.indices().create(createIndexRequest);
            }
            LOGGER.info("1. Creating an index {} finished", index);
    
            // 2. Indexing data
            LOGGER.info("2. Indexing data start");
            IndexData indexData = new IndexData("first_name", "Bruce");
            IndexRequest<IndexData> indexRequest = new IndexRequest.Builder<IndexData>().index(index).id("1").document(indexData).build();
            client.index(indexRequest);
            LOGGER.info("2. Indexing data finished");
    
            Thread.sleep(1500);
    
            // 3. Searching for documents
            LOGGER.info("3. Searching for documents start");
            SearchResponse<IndexData> searchResponse = client.search(s -> s.index(index), IndexData.class);
            for (int i = 0; i< searchResponse.hits().hits().size(); i++) {
                LOGGER.info(searchResponse.hits().hits().get(i).source());
            }        
            LOGGER.info("3. Searching for documents finished");
            
            // 4. Deleting a document
            LOGGER.info("4. Deleting a document start");
            client.delete(b -> b.index(index).id("1"));
            LOGGER.info("4. Deleting a document finished");
    
            // 5. Deleting an index
            LOGGER.info("5. Deleting an index {} start", index);
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build();
            DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest);
            LOGGER.info("5. Deleting an index {} finished", index);

            LOGGER.info("end test...");
        } catch (Exception e) {
            LOGGER.error(e.toString());
        }
    }
}

运行方式

mvn clean package
mvn exec:java -Dexec.mainClass=samples.OpenSearchClientExample

Python

依赖配置

pip3 install opensearch-py

示例程序

import os
from opensearchpy import OpenSearch

host = os.getenv("HOST", default="<poalrsearch_host>")
port = int(os.getenv("PORT", <poalrsearch_port>))
auth = (os.getenv("USERNAME", "<poalrsearch_username>"), os.getenv("PASSWORD", "<poalrsearch_password>"))


client = OpenSearch(
    hosts=[{"host": host, "port": port}],
    http_auth=auth,
    use_ssl=False,
    verify_certs=False,
    ssl_show_warn=False,
)

# Create an index with non-default settings.
index_name = 'python-test-index'
index_body = {
  'settings': {
    'index': {
      'number_of_shards': 4
    }
  }
}

response = client.indices.create(index=index_name, body=index_body)
print('\nCreating index:')
print(response)

# Add a document to the index.
document = {
  'title': 'Moneyball',
  'director': 'Bennett Miller',
  'year': '2011'
}
id = '1'

response = client.index(
    index = index_name,
    body = document,
    id = id,
    refresh = True
)

print('\nAdding document:')
print(response)

# Perform bulk operations

movies = '{ "index" : { "_index" : "my-dsl-index", "_id" : "2" } } \n { "title" : "Interstellar", "director" : "Christopher Nolan", "year" : "2014"} \n { "create" : { "_index" : "my-dsl-index", "_id" : "3" } } \n { "title" : "Star Trek Beyond", "director" : "Justin Lin", "year" : "2015"} \n { "update" : {"_id" : "3", "_index" : "my-dsl-index" } } \n { "doc" : {"year" : "2016"} }'

client.bulk(body=movies)

# Search for the document.
q = 'miller'
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['title^2', 'director']
    }
  }
}

response = client.search(
    body = query,
    index = index_name
)
print('\nSearch results:')
print(response)

# Delete the document.
response = client.delete(
    index = index_name,
    id = id
)

print('\nDeleting document:')
print(response)

# Delete the index.
response = client.indices.delete(
    index = index_name
)

print('\nDeleting index:')
print(response)

相关文档