通过客户端访问阿里云Elasticsearch

本文介绍如何使用PHP、Python、Java和Go等语言访问Elasticsearch实例或Serverless应用,并为您提供示例代码和注意事项。

准备工作

  • 创建Elasticsearch实例或Serverless应用。 具体操作,请参见创建阿里云Elasticsearch实例创建Serverless应用

  • 安装对应语言的Elasticsearch客户端。

    建议创建与Elasticsearch相同版本的客户端,避免出现兼容性问题。Elasticsearch与客户端版本兼容性的详细信息,请参见Compatibility

    • Elasticsearch GO客户端:Elasticsearch Go Client

      说明

      使用Go语言连接阿里云Elasticsearch前,您需要先安装Go编译环境,详细信息请参见The Go Programming Language。本文示例使用Go 1.19.1版本。

    • Elasticsearch Java客户端:Elasticsearch Java API Client

      说明
      • Java客户端类型包括:Transport Client、Low Level REST Client、High Level REST Client和Java API Client,各类型的代码示例,请参见概述。本文以6.7版本的High Level REST Client为例。

      • 由于Java Transport Client通过TCP与Elasticsearch进行通信,当客户端与不同版本的Elasticsearch通信时,会存在兼容性问题,所以官方在高版本集群中已弃用Transport Client。如果您已经创建了,使用Transport Client 5.5或5.6版本与5.5或5.6版本的Elasticsearch集群建立连接时会提示NoNodeAvailableException的错误,推荐您使用Transport Client 5.3.3或Java Low Level REST Client访问Elasticsearch集群,以保障版本的兼容性。

    • Elasticsearch PHP客户端:Elasticsearch PHP Client

      说明

      Elasticsearch的PHP客户端提供的默认连接池并不适合云上环境。阿里云Elasticsearch提供了负载均衡的域名服务,因此PHP客户端访问程序必须使用SimpleConnectionPool作为连接池,否则在触发阿里云Elasticsearch重启操作时会出现访问连接异常的问题。同时,PHP客户端访问程序必须具备访问连接失败重连的机制。因为在PHP客户端访问程序使用SimpleConnectionPool作为连接池后,不排除在触发阿里云Elasticsearch重启操作时还会出现访问连接异常问题(例如,提示No enabled connection)。

    • Elasticsearch Python客户端:Elasticsearch Python Client

    • 更多Elasticsearch客户端:Elasticsearch Clients

  • 开启Elasticsearch实例的自动创建索引功能。具体操作请参见配置YML参数

  • 配置Elasticsearch实例或Serverless应用白名单,确保网络互通。

    • 如果运行代码的服务器与Elasticsearch实例在同一专有网络VPC(Virtual Private Cloud)中,可通过阿里云Elasticsearch实例的私网地址进行连通。连通前,需要确保VPC私网访问白名单(默认为0.0.0.0/0)中已添加了服务器的私网IP地址。

    • 如果运行代码的服务器在公网环境下,可通过Elasticsearch的公网地址进行连通。开启Elasticsearch的公网地址,并将服务器的公网IP地址添加到Elasticsearch实例或Serverless应用公网地址访问白名单中。具体操作,请参见配置Elasticsearch实例公网或私网访问白名单配置Serverless应用公网访问白名单

      重要
      • 如果您使用的是WIFI、宽带等网络,需要将公网出口的跳板机IP地址配置进去。建议您通过浏览器访问cip.cc查询。

      • 您也可以将白名单配置为0.0.0.0/0,允许所有IPv4地址访问Elasticsearch。此配置会导致实例完全暴露在公网中,增加安全风险,配置前请确认您是否可以接受这个风险。

      • 如果未配置白名单或白名单配置错误,系统会提示连接超时Timeout connecting

      • 如果您需要通过客户端访问Kibana节点,还需要配置Kibana的访问白名单。详细信息,请参见配置Kibana公网或私网访问白名单配置Serverless应用公网访问白名单

示例代码

常见客户端访问阿里云Elasticsearch实例的代码示例。

//以Go 1.19.1版本为例。
package main

import (
  "log"
  "github.com/elastic/go-elasticsearch/v7"
)

func main() {
  cfg := elasticsearch.Config {
    Addresses: []string{
      "<YourEsHost>",
    },
    Username: "<UserName>",
    Password: "<YourPassword>",
  }

  es, err := elasticsearch.NewClient(cfg)
  if err != nil {
    log.Fatalf("Error creating the client: %s", err)
  }

  res, err := es.Info()
  if err != nil {
    log.Fatalf("Error getting response: %s", err)
  }

  defer res.Body.Close()
  log.Println(res)
}
//以6.7版本的High Level REST Client为例。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class RestClientTest67 {

    private static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

        // 默认缓存限制为100MB,此处修改为30MB。
        builder.setHttpAsyncResponseConsumerFactory(
                new HttpAsyncResponseConsumerFactory
                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }

    public static void main(String[] args) {
        // 阿里云Elasticsearch集群需要basic auth验证。
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
       //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("<UserName>", "<YourPassword>"));

        // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
       // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
        RestClientBuilder builder = RestClient.builder(new HttpHost("<YourEsHost>", 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });

        // RestHighLevelClient实例通过REST low-level client builder进行构造。
        RestHighLevelClient highClient = new RestHighLevelClient(builder);

        try {
            // 创建request。
            Map<String, Object> jsonMap = new HashMap<>();
           jsonMap.put("<YourEsField1>", "<YourEsFieldValue1>");
           jsonMap.put("<YourEsField2>", "<YourEsFieldValue2>");
           IndexRequest indexRequest = new IndexRequest("<YourEsIndex>", "<YourEsType>", "<YourEsId>").source(jsonMap);

            // 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。
            IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);

            long version = indexResponse.getVersion();

            System.out.println("Index document successfully! " + version);

            highClient.close();

        } catch (IOException ioException) {
            // 异常处理。
        }
    }
}
<?php
require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;

$client = ClientBuilder::create()->setHosts([
  [
    'host'   => '<YourEsHost>',
    'port'   => '9200',
    'scheme' => 'http',
    'user'   => '<UserName>',
    'pass'   => '<YourPassword>'
  ]
])->setConnectionPool('\Elasticsearch\ConnectionPool\SimpleConnectionPool', [])
  ->setRetries(10)->build();

$indexParams = [
  'index'  => '<YourEsIndex>',
  'type'   => '<YourEsType>',
  'id'     => '<YourEsId>',
  'body'   => ['<YourEsField>' => '<YourEsFieldValue>'],
  'client' => [
    'timeout'         => 10,
    'connect_timeout' => 10
  ]
];
$indexResponse = $client->index($indexParams);
print_r($indexResponse);

$searchParams = [
  'index'  => '<YourEsIndex>',
  'type'   => '<YourEsType>',
  'body'   => [
    'query' => [
      'match' => [
        '<YourEsField>' => '<YourEsFieldValue>'
      ]
    ]
  ],
  'client' => [
    'timeout'         => 10,
    'connect_timeout' => 10
  ]
];
$searchResponse = $client->search($searchParams);
print_r($searchResponse);
?>
from elasticsearch import Elasticsearch, RequestsHttpConnection
import certifi
es = Elasticsearch(
    ['<YourEsHost>'],
    http_auth=('<UserName>', '<YourPassword>'),
    port=9200,
    use_ssl=False
)
res = es.index(index="<YourEsIndex>", doc_type="<YourEsType>", id=<YourEsId>, body={"<YourEsField1>": "<YourEsFieldValue1>", "<YourEsField2>": "<YourEsFieldValue2>"})
res = es.get(index="<YourEsIndex>", doc_type="<YourEsType>", id=<YourEsId>)
print(res['_source'])

以上代码示例只展示了HTTP连接方式,如果ES集群开启了HTTPS协议,代码示例中需要将use_ssl改为True,并加上verify_certs=True,其余不变。即:

es = Elasticsearch(
['<YourEsHost>'],
http_auth=('<UserName>', '<YourPassword>'),
port=9200,
use_ssl=True,
verify_certs=True
)

常见客户端访问Serverless应用的代码示例。

说明
  • Elasticsearch Serverless仅支持索引相关的API,其他API无权限操作。

  • Elasticsearch Serverless的Python API兼容社区版ES 7.10 Python索引级别操作。更多信息,请参见 Elasticsearch API

//以下代码示例为如何使用BulkRequest进行批量写入和SearchRequest。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class RestClientTest710 {
    public static void main(String[] args) {
        // 阿里云Elasticsearch Serverless集群需要basic auth验证。
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //访问用户名和密码为您创建阿里云Elasticsearch Serverless实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("<UserName>", "<YourPassword>"));

        // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
        // 单击所创建的Elasticsearch Serverless实例ID,在基本信息页面获取公网地址,即为ES集群地址。
        RestClientBuilder builder = RestClient.builder(new HttpHost("<YourEsHost>", 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });

        // RestHighLevelClient实例通过REST low-level client builder进行构造。
        RestHighLevelClient highClient = new RestHighLevelClient(builder);

        try {
            //批量写入
            //创建BulkRequest 对象,并使用 timeout() 方法设置了请求的超时时间为 "10s"
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout("10s");

            // 创建request。
            Map<String, Object> jsonMap = new HashMap<>();
            // field_01、field_02为字段名,value_01、value_02为对应的值。
            jsonMap.put("{field_01}", "{value_01}");
            jsonMap.put("{field_02}", "{value_02}");
            IndexRequest indexRequest = new IndexRequest("{index_name}").id("{doc_id}").source(jsonMap);
            bulkRequest.add(indexRequest);

            Map<String, Object> jsonMap2 = new HashMap<>();
            jsonMap2.put("{field_01}", "{value_01}");
            jsonMap2.put("{field_02}", "{value_02}");
            IndexRequest indexRequest2 = new IndexRequest("{index_name}").id("{doc_id}").source(jsonMap2);
            bulkRequest.add(indexRequest2);

            BulkResponse bulkResponse = highClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            System.out.println(bulkResponse.hasFailures());


            //查询所有文档
            //指定搜索的索引
            SearchRequest searchRequest = new SearchRequest("{index_name}");
            //构建了一个查询对象
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            //创建一个匹配所有文档的查询
            searchSourceBuilder.query(QueryBuilders.matchAllQuery());
            //将查询对象设置为 SearchRequest 的源
            searchRequest.source(searchSourceBuilder);

            SearchResponse searchResponse = highClient.search(searchRequest, RequestOptions.DEFAULT);
            //从 SearchResponse 中获取搜索结果的数组 SearchHit[] searchHits。
            SearchHit[] searchHits = searchResponse.getHits().getHits();
            
            //遍历 searchHits 数组,对每一个搜索命中的文档进行处理。在此示例中,使用 getSourceAsString() 方法将文档的源数据以字符串形式打印出来。
            for (SearchHit hit : searchHits) {
                System.out.println(hit.getSourceAsString());
            }


            //查询单个文档
            GetRequest getRequest = new GetRequest("{index_name}", "{doc_id}");
            GetResponse getResponse = highClient.get(getRequest, RequestOptions.DEFAULT);
            //打印文档的内容
            System.out.println(getResponse.getSourceAsString());
            //返回的全部内容和命令式一样的
            System.out.println(getResponse);

            highClient.close();

        } catch (IOException ioException) {
            // 异常处理。
        }
    }
}
# coding="utf-8"
from elasticsearch import Elasticsearch
import time
import string
import random
def stru_doc(index_name,doc_type="_doc",count=100):
    doc = []
    for i in range(count):
        _id = ''.join([random.choice(letters) for _ in range(17)])

        i_doc = [{"create": {"_index": index_name, "_id": _id, "_type": doc_type}},
                 {"field1": "测试创建文档dd汽车时代,如果你万一开车撞了什么东西,千万千万要下车查看检查,否则如果撞到的是人,就算是逃逸,只要构成重伤,负主责以上,就构成交通肇事罪哦。",
                  "filed2": "2017年12月01日之后使用公共镜像创建的ECS实例,默认预装云助手客户端。如果您的实例是2017年12月01日之前购买的,若需要使用云助手相关功能,请自行安装云助手客户端。"
                  "不同操作系统可选择的安装方式如下表所示。",
                  "field3": "如果您选择默认安装路径,客户端在Windows实例中的安装目录为 C:\ProgramData\aliyun\assist\。",
                  "filed4":"通过阅读本文,您可以快速了解云服务器ECS的计费项及其计费方式、计费组成、定价等主要计费信息。",
                  "filed5":"说明 ECS资源的基础计费方式为包年包月和按量付费,针对不同的ECS资源,您可以根据需要结合其它优惠的计费方式来降低使用成本。更多信息请参见 计费方式概述。",
                  "filed6":"猫不在袋子里了!我们很自豪地说我们参与了 Ai.com 域名的销售。非常期待看到他们会用该域名做什么!IT之家 2 月 16 日消息,人工智能聊天机器人 ChatGPT 近日火爆全球,该技术的开发商 OpenAI 豪掷千金,将超优质域名 AI.com 链接跳转到了 ChatGPT。众所周知,OpenAI 背后的金主是科技巨头微软,目前必应等产品已开始测试 ChatGPT,资金支持自然也不在话下。微软此前还宣布,将扩大与 OpenAI 的合作关系,后者将获得微软“多年、数十亿美元”的投资,具体数额没有披露,有媒体报道称是 100 亿美元。",
                  "@timestamp": int(time.time())}]
        doc += i_doc
    return doc

def bulk_request():
    """
    持续写入 不创建索引
   
    :return:
    """
    index_name = "ali_push_doc_%s" % time.strftime('%H%M%S')

    body = {
        "settings": {"number_of_replicas": 0,
                     "number_of_shards": 1}
    }

    doc_type = "_doc"
    try:
        create_index = es.indices.create(index=index_name, body=body)
        print(create_index)
    except Exception as index:
        print(time.strftime('%H:%M:%s') + str(index))
        time.sleep(20)
    bulk_body = stru_doc(index_name)
    try:
        bulk_request = es.bulk(body=bulk_body, timeout='10s')
    # print(time.strftime('%H:%M:%S') + 'push_test' + str(bulk_request))
    except Exception as u:
        print(u)
        with open('push_doc_exception.txt', 'w') as f:
            f.write(time.strftime('%H:%M:%S') + url + '\n' + str(u) + '\n' + str(bulk_body) + '\n')
    es.indices.refresh()
    doc_account = es.count(index=index_name)
    print("indexcount:",doc_account)
    if doc_account["count"] == 100:
        for times in range(100):
            try:
                query_body = {
                    "query": {
                        "match_all": {}
                    }
                }
                search_request = es.search(query_body, index_name)
                print(search_request)
            except Exception as e:
                with open('asyn_error.txt', 'a') as f:
                    f.write(time.strftime('%H:%M:%S') + str(e) + '\n' + str(e) + '\n')
    else:
        with open('asyn_error.txt', 'a') as f:
            f.write(time.strftime('%H:%M:%S') + str(doc_account) + '\n')

if __name__ == '__main__':
    url = <YourEsHost>
    es = Elasticsearch(url, http_auth=(<UserName>, <YourPassword>))
    letters = string.ascii_lowercase + string.digits
    bulk_request()

使用代码时,您需要将以下字段替换为对应的值。

参数

说明

<YourEsHost>

Elasticsearch实例的私网、公网地址,或Serverless应用的公网访问地址。可在Elasticsearch实例或Serverless应用的基本信息区域获取。

<UserName>

输入Elasticsearch实例的用户名elastic,或Serverless应用的用户名称,在应用详情页获取

<YourPassword>

输入Elasticsearch实例或Serverless应用的用户密码。

如果忘记密码,可在Elasticsearch实例详情页的安全配置中重置,或在Serverless应用详情页的基本信息区域重置。更多信息,请参见重置Elasticsearch实例访问密码

<YourEsIndex>

索引名称。

<YourEsType>

文档类型。

重要

Elasticsearch 7.0以下版本可自定义文档类型,7.0及以上版本固定为_doc

<YourEsId>

文档ID。

<YourEsField>

字段名称。

<YourEsFieldValue>

<YourEsField>的值。