文档

使用Elasticsearch SDK访问日志服务

更新时间:

本文介绍如何使用Elasticsearch SDK和Elasticsearch兼容接口分析日志服务中的数据。

重要

本文档为阿里云原创文档,知识产权归阿里云所有,由于本文档旨在介绍阿里云与第三方产品交互的服务能力,因此可能会提及第三方公司或产品等名称。

前提条件

  • 已采集数据到日志服务标准型Logstore。具体操作,请参见数据采集

  • 已至少创建一个字段索引。具体操作,请参见创建索引

  • 已创建阿里云账号的AccessKey。具体操作,请参见访问密钥

    建议使用RAM用户的AccessKey,该RAM用户需具备Logstore的数据查询权限。您可以通过权限助手,配置权限。具体操作,请参见配置权限助手

注意事项

  • 仅支持Elasticsearch 7.x版本的SDK访问Elasticsearch兼容接口。

  • 在查询时不指定@timestamp,则默认查询24小时内的数据。

参数说明

参数名称

说明

hosts

数据访问地址。格式为https://${Project名称}.${Project访问域名}/es/。更多信息,请参见服务入口

重要

只支持使用HTTPS协议。

Username

输入Username(阿里云主账号或者RAM用户的AccessKey ID)和Password(阿里云主账号或者RAM用户的AccessKey Secret)。

建议使用RAM用户的AccessKey,该RAM用户需具备Logstore的数据查询权限。您可以通过权限助手,配置权限。具体操作,请参见配置权限助手。获取AccessKey的方法,请参见访问密钥

Password

Index

${Project名称}.${Logstore名称}

示例

下面以 project 为etl-dev、logstore为accesslog、slsEndpoint为cn-huhehaote.log.aliyuncs.com 为例,演示如何使用Elasticsearch SDK和Elasticsearch兼容接口分析日志服务中的数据。

cURL访问示例

curl -u ${ALIYUN_ACCESS_KEY_ID}:${ALIYUN_ACCESS_KEY_SECRET} "https://etl-dev.cn-huhehaote.log.aliyuncs.com/es/etl-dev.accesslog/_search?q=status:200"

Python SDK访问示例

  1. 安装依赖。

    pip install elasticsearch==7.10.0

  2. 示例。

    #!/bin/env python3
    import os
    import json
    import time
    from elasticsearch import Elasticsearch, helpers
    
    slsProject = "etl-dev"
    slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
    slsLogstore = "accesslog"
    
    esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
    esIndex = "%s.%s" % (slsProject, slsLogstore)
    
    # 从环境变量中获取到ak信息
    accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
    accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
    
    esClient = Elasticsearch(hosts=esHost,
                    http_auth=(accessKeyId, accessKeySecret),
                       verify_certs=True, timeout=300)
    
    endTime = int(time.time()*1000)
    startTime = endTime - 3600*1000
    
    r = esClient.search(
        index=esIndex,
        body=   {
            "query": {
                "bool": {
                    "filter": [
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": startTime,
                                    "lte": endTime,
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ]
                }
            }
         }
    )
    
    print(json.dumps(r, indent=4))

ElasticSearch-DSL方式访问示例

ElasticSearch-DSL(ElasticSearch Domain Specific language)是ElasticSearch搜索数据的语法,简称DSL。为了避免手动组装ElasticSearch-DSL,可以用如下方式访问。

  1. 安装依赖。

    pip install elasticsearch-dsl==7.4.1
  2. elasticsearch-dsl方式访问示例。

    #!/bin/env python3
    import os
    import json
    import time
    from elasticsearch import Elasticsearch, helpers
    from elasticsearch_dsl import Search, Q
    
    slsProject = "etl-dev"
    slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
    slsLogstore = "accesslog"
    
    esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
    esIndex = "%s.%s" % (slsProject, slsLogstore)
    
    # 从环境变量中获取到ak信息
    accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
    accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
    
    esClient = Elasticsearch(hosts=esHost,
                    http_auth=(accessKeyId, accessKeySecret),
                       verify_certs=True, timeout=300)
    
    endTime = int(time.time()*1000)
    startTime = endTime - 3600*1000
    
    s = Search(using=esClient, index=esIndex) \
            .filter(Q("range", **{"@timestamp": {"gte": startTime, "lt": endTime, "format": "epoch_millis"}}))  \
            .query("match", request_method="GET") \
    
    response = s.execute()
    
    for hit in response:
        # request_method, host, client_ip 是sls日志中的字段
        print(hit.request_method, hit.host, hit.client_ip)

Golang SDK访问示例

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/olivere/elastic/v7"
)

func main() {
	// 下面是一个es sdk访问sls es 兼容接口的样例
	slsProject := "etl-dev"
	slsLogstore := "accesslog"
	slsEndpoint := "cn-huhehaote.log.aliyuncs.com"

	accessKeyID := os.Getenv("ALIYUN_ACCESS_KEY_ID")
	accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
	esHost := fmt.Sprintf("https://%s.%s:443/es", slsProject, slsEndpoint)
	esIndex := fmt.Sprintf("%s.%s", slsProject, slsLogstore)

	esClient, err := elastic.NewClient(
		elastic.SetURL(esHost),
		elastic.SetSniff(false),
		elastic.SetBasicAuth(accessKeyID, accessKeySecret), // 设置基本认证的用户名和密码
		elastic.SetHealthcheck(false),                      // 关闭健康检查
	)
	if err != nil {
		panic(err)
	}

	termQuery := elastic.NewTermQuery("request_method", "GET")
	endTime := time.Now().Unix()
	startTime := endTime - 3600
	timeRangeQuery := elastic.NewRangeQuery("@timestamp").Gte(startTime).Lte(endTime)

	boolQuery := elastic.NewBoolQuery()
	boolQuery = boolQuery.Must(timeRangeQuery, termQuery)

	searchResult, err := esClient.Search().
		Index(esIndex).
		Query(boolQuery).
		From(0).Size(10).
		Pretty(true).
		Do(context.Background())
	if err != nil {
		panic(err)
	}

	// 输出结果
	for _, hit := range searchResult.Hits.Hits {
		fmt.Println(string(hit.Source))
	}
}

Java SDK访问示例

  1. pom.xml引入依赖。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>estest</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>7.10.1</version>
            </dependency>
        </dependencies>
    </project>
  2. 示例。

    package org.example;
    
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.index.query.BoolQueryBuilder;
    import org.elasticsearch.index.query.MatchQueryBuilder;
    import org.elasticsearch.index.query.RangeQueryBuilder;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    
    import java.io.IOException;
    
    public class Main {
        public static void main(String[] args) throws IOException {
    
            String slsProject = "etl-dev";
            String slsLogstore = "accesslog";
            String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";
    
            String schema = "https";
            String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
            int port = 443;
            String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
            String esPrefix = "/es/";
            String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials(accessKeyId, accessKeySecret));
    
            RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                        httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    
            // Set /es/ prefix
            builder.setPathPrefix(esPrefix);
            RestHighLevelClient client = new RestHighLevelClient(builder);
    
            // Query
            BoolQueryBuilder boolExpr= new BoolQueryBuilder();
    
            long endTime = System.currentTimeMillis();
            long startTime = endTime - 3600 * 1000;
            boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET"));
            boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis"));
    
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    
            searchSourceBuilder.query(boolExpr);
            SearchRequest searchRequest = new SearchRequest(esIndex);
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(searchResponse.toString());
    
            client.close();
        }
    }

PHP SDK访问示例

  1. 使用composer安装PHP插件

    composer require elasticsearch/elasticsearch
  2. 示例。

    <?php
    
    require 'vendor/autoload.php';
    
    use Elasticsearch\ClientBuilder;
    
    $slsProject = 'etl-dev';
    $slsLogstore = 'accesslog';
    $slsEndpoint = 'cn-huhehaote.log.aliyuncs.com';
    
    $esHost = $slsProject . '.' . $slsEndpoint;
    $esIndex = $slsProject . '.' . $slsLogstore;
    
    $accessKeyId = getenv('ALIYUN_ACCESS_KEY_ID');
    $accessKeySecret = getenv('ALIYUN_ACCESS_KEY_SECRET');
    
    $hosts = [
        [
            'host' => $esHost,
            'port' => '443',
            'scheme' => 'https',
            'path' => '/es',
            'user' => $accessKeyId,
            'pass' => $accessKeySecret,
        ]
    ];
    
    $client = ClientBuilder::create()
        ->setHosts($hosts)
        ->build();
    
    $endTime = round(microtime(true) * 1000); // 毫秒
    $startTime = $endTime - (3600 * 1000);
    
    
    $params = [
        'index' => $esIndex,
        'body'  => [
            'query' => [
                'bool' => [
                    'must' => [
                        'match' => [
                            'request_method' => 'GET'
                        ]
                    ],
                    'filter' => [
                        'range' => [
                            '@timestamp' => [
                                'gte' => $startTime,
                                'lte' => $endTime
                            ]
                        ]
                    ]
                ]
            ]
        ]
    ];
    
    $response = $client->search($params);
    
    print_r($response);