Filebeat+Kafka+Logstash+Elasticsearch Serverless构建日志分析系统
当您需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch Serverless采集日志数据到阿里云Elasticsearch Serverless中,进行可视化展示与分析。本文介绍具体的实现方法。
背景信息
Kafka是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。本文使用自建消息队列Kafka。
在实际应用场景中,为了满足大数据实时检索的需求,您可以使用Filebeat采集日志数据,并输出到Kafka中。Kafka实时接收Filebeat采集的数据,并输出到Logstash中。输出到Logstash中的数据在格式或内容上可能不能满足您的需求,此时可以通过Logstash的filter插件过滤数据。最后将满足需求的数据输出到Elasticsearch Serverless中进行分布式检索,并进行数据分析与展示。
操作流程
步骤一:环境准备
开通阿里云Elasticsearch Serverless服务,并创建应用和指标类型的数据流。
创建阿里云ECS实例,并准备Java环境。
创建阿里云ECS实例的具体操作请参见自定义购买实例。
步骤二:安装并配置Kafka
本文的配置仅供参考测试,实际配置时需要根据您的业务进行调整,详细信息请参见Kafka Quick Start。
连接ECS服务器。
具体操作步骤请参见连接实例。
说明本文档以普通用户权限为例。
安装并配置zookeeper。
说明本文以zookeeper 3.4.12版本为例。
下载zookeeper安装包。
sudo wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
解压。
sudo tar -zxf zookeeper-3.4.12.tar.gz -C /usr/local/
配置环境变量。
打开环境变量配置文件profile。
sudo vim /etc/profile
在环境变量配置文件profile最后面添加如下配置。
sudo export PATH=$PATH:/usr/local/zookeeper-3.4.12/bin/
使环境变量生效。
sudo source /etc/profile
修改zookeeper配置,如果没有特殊配置,保持默认即可。
进入zookeeper配置文件目录。
cd /usr/local/zookeeper-3.4.12/conf
将zoo_sample.cfg文件内容拷贝到配置文件zoo.cfg中。
sudo cp zoo_sample.cfg zoo.cfg
查看配置文件。
sudo vim zoo.cfg
启动zookeeper。
sudo zkServer.sh start
查看zookeeper状态。
sudo zkServer.sh status
安装并配置Kafka。
说明本文以Kafka 2.1.1版本为例。
返回根目录。
cd ~
下载Kafka安装包。
sudo wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
解压。
sudo tar -zxf kafka_2.11-2.1.1.tgz -C /usr/local/
配置环境变量。
打开环境变量配置文件profile。
sudo vim /etc/profile
在环境变量配置文件profile最后面添加如下配置。
sudo export PATH=$PATH:/usr/local/kafka_2.11-2.1.1/bin/
使环境变量生效。
sudo source /etc/profile
修改Kafka配置。
打开Kafka配置文件。
sudo vim /usr/local/kafka_2.11-2.1.1/config/server.properties
修改配置文件中的listeners配置,并保存。
本文以单机测试环境为例,其他参数保持默认。
listeners=PLAINTEXT://localhost:9092
启动Kafka,启动之前需要确保zookeeper正常运行。
sudo nohup kafka-server-start.sh /usr/local/kafka_2.11-2.1.1/config/server.properties &
创建测试topic(serverless)。
sudo kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic serverless
步骤三:安装并配置Filebeat
连接ECS服务器。
具体操作步骤请参见连接实例。
说明本文档以普通用户权限为例。
安装Filebeat。
本文以8.5.3版本为例,安装命令如下,详细信息请参见Install Filebeat。
sudo wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.5.3-linux-x86_64.tar.gz sudo tar -zxf filebeat-8.5.3-linux-x86_64.tar.gz -C /usr/local/
执行以下命令,进入Filebeat安装目录,创建并配置filebeat.kafka.yml文件。
cd /usr/local/filebeat-8.5.3-linux-x86_64 sudo vi filebeat.kafka.yml
filebeat.kafka.yml配置如下。
filebeat.inputs: - type: log paths: - /var/log/*.log output.kafka: hosts: ["localhost:9092"] topic: serverless
参数
说明
type
输入类型。设置为log,表示输入源为日志。
paths
需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。
hosts
消息队列Kafka实例的单个接入点。本文使用的是本地自建单节点实例,因此使用localhost。
topic
日志输出到消息队列Kafka的Topic,请指定为您已创建的Topic。
启动Filebeat。
sudo nohup ./filebeat -e -c filebeat-kafka.yml > fb-kafka.log 2>&1 &
步骤四:安装并配置Logstash管道
连接ECS服务器。
具体操作步骤请参见连接实例。
说明本文档以普通用户权限为例。
安装Logstash。
本文以7.10.2为例,安装命令如下。
cd ~ sudo wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.2-linux-x86_64.tar.gz sudo tar -zxf logstash-7.10.2-linux-x86_64.tar.gz -C /usr/local/
创建Logstash配置文件ls2serverless.yml,并进行配置。
进入Logstash安装目录下的config目录中。
cd /usr/local/logstash-7.10.2/config
创建并打开ls2serverless.yml配置文件。
sudo vim ls2serverless.yml
在ls2serverless.yml配置文件中,参考以下脚本进行配置。
input { kafka { bootstrap_servers => ["localhost:9092"] topics => ["serverless"] codec => json } } output { elasticsearch { hosts => ["https://es-serverless-cn-hangzhou.aliyuncs.com:443"] custom_headers => { "X-Api-Key" => "7*" "X-Xops-App-Name" => "zjy-test" "X-Xops-Data-Stream-Name" => "kafka_data" "X-Xops-Data-Type" => "logs" } } }
表 1. input参数说明 参数
说明
bootstrap_servers
消息队列Kafka实例的接入点。本文使用的是本地自建单节点实例,因此使用localhost。
topics
指定为您已创建的Topic的名称,需要与Filebeat中配置的Topic名称保持一致。
codec
设置为json,表示解析JSON格式的字段,便于在Kibana中分析。
表 2. output参数说明 参数
说明
hosts
配置为Elasticsearch Serverless服务的访问地址,获取方式请参见查看应用的基本信息。
custom_headers
Serverless数据流的配置:
保存并退出。
启动Logstash。
sudo nohup ./bin/logstash -f config/lg2serverless.yml > ls2serverless.log 2>&1 &
步骤五:在Elasticsearch Serverless服务控制台查看日志信息
在左侧导航栏,单击应用管理。
在应用列表中,单击目标应用名称。
在左侧导航栏,选择 。
在日志查询页面,选择目标数据流,查看该数据流的日志。
更多详细信息,请参见日志查询。
常见问题
Q:用Filebeat推送数据时,Filebeat的日志里出现一条log.level
报错,是什么原因?
A:日志中出现以下报错信息,您忽略即可,该报错不影响ES推送数据。
{"log.level":"error","@timestamp":"2023-04-28T10:14:23.121+0800","log.logger":"elasticsearch","log.origin":{"file.name":"elasticsearch/client.go","file.line":387},"message":"expected item response object","service.name":"filebeat","ecs.version":"1.6.0"}