本文介绍如何使用Elasticsearch SDK和Elasticsearch兼容接口分析日志服务中的数据。
本文档为阿里云原创文档,知识产权归阿里云所有,由于本文档旨在介绍阿里云与第三方产品交互的服务能力,因此可能会提及第三方公司或产品等名称。
前提条件
已创建Project、标准型Logstore并完成日志采集。具体操作,请参见创建项目Project、创建Logstore和数据采集概述。
查询日志前,已配置索引。具体操作,请参见创建索引。
已创建RAM用户的AccessKey,并且为RAM用户授予Logstore的查询权限。具体操作,请参见RAM授权。
注意事项
仅支持Elasticsearch 7.x版本的SDK访问Elasticsearch兼容接口。
在查询时不指定
@timestamp
,则默认查询24小时内的数据。
参数说明
参数名称 | 说明 |
| 数据访问地址。格式为 重要 只支持使用HTTPS协议。 |
| 输入 |
| |
|
|
示例
下面以 project 为etl-dev、logstore为accesslog、slsEndpoint为cn-huhehaote.log.aliyuncs.com 为例,演示如何使用Elasticsearch SDK和Elasticsearch兼容接口分析日志服务中的数据。
cURL访问示例
curl -u ${ALIYUN_ACCESS_KEY_ID}:${ALIYUN_ACCESS_KEY_SECRET} "https://etl-dev.cn-huhehaote.log.aliyuncs.com/es/etl-dev.accesslog/_search?q=status:200"
Python SDK访问示例
安装依赖。
pip install elasticsearch==7.10.0
示例。
#!/bin/env python3 import os import json import time from elasticsearch import Elasticsearch, helpers slsProject = "etl-dev" slsEndpoint = "cn-huhehaote.log.aliyuncs.com" slsLogstore = "accesslog" esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint) esIndex = "%s.%s" % (slsProject, slsLogstore) # 从环境变量中获取到ak信息 accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID'] accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET'] esClient = Elasticsearch(hosts=esHost, http_auth=(accessKeyId, accessKeySecret), verify_certs=True, timeout=300) endTime = int(time.time()*1000) startTime = endTime - 3600*1000 r = esClient.search( index=esIndex, body= { "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": startTime, "lte": endTime, "format": "epoch_millis" } } } ] } } } ) print(json.dumps(r, indent=4))
ElasticSearch-DSL方式访问示例
ElasticSearch-DSL(ElasticSearch Domain Specific language)是ElasticSearch搜索数据的语法,简称DSL。为了避免手动组装ElasticSearch-DSL,可以用如下方式访问。
安装依赖。
pip install elasticsearch-dsl==7.4.1
elasticsearch-dsl方式访问示例。
#!/bin/env python3 import os import json import time from elasticsearch import Elasticsearch, helpers from elasticsearch_dsl import Search, Q slsProject = "etl-dev" slsEndpoint = "cn-huhehaote.log.aliyuncs.com" slsLogstore = "accesslog" esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint) esIndex = "%s.%s" % (slsProject, slsLogstore) # 从环境变量中获取到ak信息 accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID'] accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET'] esClient = Elasticsearch(hosts=esHost, http_auth=(accessKeyId, accessKeySecret), verify_certs=True, timeout=300) endTime = int(time.time()*1000) startTime = endTime - 3600*1000 s = Search(using=esClient, index=esIndex) \ .filter(Q("range", **{"@timestamp": {"gte": startTime, "lt": endTime, "format": "epoch_millis"}})) \ .query("match", request_method="GET") \ response = s.execute() for hit in response: # request_method, host, client_ip 是sls日志中的字段 print(hit.request_method, hit.host, hit.client_ip)
Golang SDK访问示例
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/olivere/elastic/v7"
)
func main() {
// 下面是一个es sdk访问sls es 兼容接口的样例
slsProject := "etl-dev"
slsLogstore := "accesslog"
slsEndpoint := "cn-huhehaote.log.aliyuncs.com"
accessKeyID := os.Getenv("ALIYUN_ACCESS_KEY_ID")
accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
esHost := fmt.Sprintf("https://%s.%s:443/es", slsProject, slsEndpoint)
esIndex := fmt.Sprintf("%s.%s", slsProject, slsLogstore)
esClient, err := elastic.NewClient(
elastic.SetURL(esHost),
elastic.SetSniff(false),
elastic.SetBasicAuth(accessKeyID, accessKeySecret), // 设置基本认证的用户名和密码
elastic.SetHealthcheck(false), // 关闭健康检查
)
if err != nil {
panic(err)
}
termQuery := elastic.NewTermQuery("request_method", "GET")
endTime := time.Now().Unix()
startTime := endTime - 3600
timeRangeQuery := elastic.NewRangeQuery("@timestamp").Gte(startTime).Lte(endTime)
boolQuery := elastic.NewBoolQuery()
boolQuery = boolQuery.Must(timeRangeQuery, termQuery)
searchResult, err := esClient.Search().
Index(esIndex).
Query(boolQuery).
From(0).Size(10).
Pretty(true).
Do(context.Background())
if err != nil {
panic(err)
}
// 输出结果
for _, hit := range searchResult.Hits.Hits {
fmt.Println(string(hit.Source))
}
}
Java SDK访问示例
pom.xml引入依赖。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>estest</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.10.1</version> </dependency> </dependencies> </project>
示例。
package org.example; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; public class Main { public static void main(String[] args) throws IOException { String slsProject = "etl-dev"; String slsLogstore = "accesslog"; String slsEndpoint = "cn-huhehaote.log.aliyuncs.com"; String schema = "https"; String esHost = slsProject + "." + slsEndpoint; // ${project}.${endpoint} int port = 443; String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore} String esPrefix = "/es/"; String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET"); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(accessKeyId, accessKeySecret)); RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback( httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); // Set /es/ prefix builder.setPathPrefix(esPrefix); RestHighLevelClient client = new RestHighLevelClient(builder); // Query BoolQueryBuilder boolExpr= new BoolQueryBuilder(); long endTime = System.currentTimeMillis(); long startTime = endTime - 3600 * 1000; boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET")); boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis")); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolExpr); SearchRequest searchRequest = new SearchRequest(esIndex); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println(searchResponse.toString()); client.close(); } }
PHP SDK访问示例
使用composer安装PHP插件。
composer require elasticsearch/elasticsearch
示例。
<?php require 'vendor/autoload.php'; use Elasticsearch\ClientBuilder; $slsProject = 'etl-dev'; $slsLogstore = 'accesslog'; $slsEndpoint = 'cn-huhehaote.log.aliyuncs.com'; $esHost = $slsProject . '.' . $slsEndpoint; $esIndex = $slsProject . '.' . $slsLogstore; $accessKeyId = getenv('ALIYUN_ACCESS_KEY_ID'); $accessKeySecret = getenv('ALIYUN_ACCESS_KEY_SECRET'); $hosts = [ [ 'host' => $esHost, 'port' => '443', 'scheme' => 'https', 'path' => '/es', 'user' => $accessKeyId, 'pass' => $accessKeySecret, ] ]; $client = ClientBuilder::create() ->setHosts($hosts) ->build(); $endTime = round(microtime(true) * 1000); // 毫秒 $startTime = $endTime - (3600 * 1000); $params = [ 'index' => $esIndex, 'body' => [ 'query' => [ 'bool' => [ 'must' => [ 'match' => [ 'request_method' => 'GET' ] ], 'filter' => [ 'range' => [ '@timestamp' => [ 'gte' => $startTime, 'lte' => $endTime ] ] ] ] ] ] ]; $response = $client->search($params); print_r($response);