日志服务支持通过Logstash消费数据,您可以通过配置日志服务的Input插件对接Logstash获取日志服务中的数据并写入到其他系统中,例如Kafka、HDFS等。
功能特性
分布式协同消费:可配置多台服务器同时消费某一个Logstore。
高性能:基于Java ConsumerGroup实现,单核消费速度可达20 MB/s(压缩前)。
高可靠性:消费进度保存到服务端,异常恢复后会从上一次消费的Checkpoint处自动恢复消费。
自动负载均衡:根据消费者数量自动分配Shard,消费者增加或减少后会自动负载均衡。
操作步骤
不同系统的Logstash安装包下载链接:Logstash
本文以Linux系统为例:
安装Logstash。详情请参见Logstash。
下载并安装公共签名密钥。
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
在
/etc/yum.repos.d/
目录下,创建.repo
后缀文件,例如创建logstash.repo
文件,输入以下内容:[logstash-9.x] name=Elastic repository for 9.x packages baseurl=https://artifacts.elastic.co/packages/9.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
下载安装Logstash。
sudo yum install logstash
安装input插件。
下载input插件。下载地址为logstash-input-sls。
安装input插件。
/usr/share/logstash/bin/logstash-plugin install logstash-input-sls.zip
说明插件安装失败原因及解决方案,请参见插件安装配置。
创建
logstash
用户,Logstash必须以非超级用户(root)运行。创建
logstash
用户。sudo adduser --system --no-create-home --group logstash
设置logstash用户权限。确保 Logstash 相关目录(如
/usr/share/logstash
、/etc/logstash
和/var/log/logstash
)的所有权归logstash
用户。sudo chown -R logstash:logstash /usr/share/logstash /etc/logstash /var/log/logstash
验证
logstash
用户是否创建成功。id logstash
输出应显示
logstash
用户的 UID 和 GID,表示创建成功。
以
logstash
用户启动Logstash。在
/etc/logstash
目录下,创建.conf后缀文件,本文以该目录下源文件logstash-sample.conf
为例。在
logstash-sample.conf
文件中输入示例代码,以logstash
用户启动Logstash。sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/logstash-sample.conf
配置Logstash消费某一个Logstore并将数据打印到标准输出,示例和参数说明如下:
重要强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。请配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见创建RAM用户的AccessKey。
参数说明
示例代码
input { logservice{ endpoint => "your project endpoint" // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 access_id => process.env.ALIBABA_CLOUD_ACCESS_KEY_ID access_key => process.env.ALIBABA_CLOUD_ACCESS_KEY_SECRET project => "your project name" logstore => "your logstore name" consumer_group => "consumer group name" consumer_name => "consumer name" position => "end" checkpoint_second => 30 include_meta => true consumer_name_with_ip => true } } output { stdout {} }
endpoint
string
(必选)日志服务Project所在的Endpoint。更多信息,请参见服务接入点。
access_id
string
(必选)阿里云AccessKey ID,需要具备消费组相关权限。更多信息,请参见指定Logstore的消费权限。
access_key
string
(必选)阿里云AccessKey Secret,需要具备消费组相关权限。更多信息,请参见指定Logstore的消费权限。
project
string
(必选)日志服务Project名称。
logstore
string
(必选)日志服务Logstore名称。
consumer_group
string
(必选)消费组名称。
consumer_name
string
(必选)消费者名称,同一个消费组中的消费者名称不能重复。
position
string
(必选)消费开始位置。
begin:从Logstore写入的第一条数据开始消费。
end:从当前时间点开始消费。
yyyy-MM-dd HH:mm:ss:从指定时间点开始消费。
checkpoint_second
number
(可选)每隔几秒Checkpoint一次,建议10秒~60秒,不能低于10秒,默认为30秒。
include_meta
boolean
(可选)数据是否包含Meta,Meta包括source、time、tag、topic,默认为true。
consumer_name_with_ip
boolean
(可选)消费者名称是否包含IP地址,默认为true,分布式协同消费下必须设置为true。