Logstash消费

日志服务支持通过Logstash消费数据,您可以通过配置日志服务Input插件对接Logstash获取日志服务中的数据并写入到其他系统中,例如Kafka、HDFS等。

功能特性

  • 分布式协同消费:可配置多台服务器同时消费某一个Logstore。

  • 高性能:基于Java ConsumerGroup实现,单核消费速度可达20 MB/s(压缩前)。

  • 高可靠性:消费进度保存到服务端,异常恢复后会从上一次消费的Checkpoint处自动恢复消费。

  • 自动负载均衡:根据消费者数量自动分配Shard,消费者增加或减少后会自动负载均衡。

操作步骤

不同系统的Logstash安装包下载链接:Logstash

本文以Linux系统为例:

  1. 安装Logstash。详情请参见Logstash

    1. 下载并安装公共签名密钥。

      sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
    2. /etc/yum.repos.d/目录下,创建.repo后缀文件,例如创建logstash.repo文件,输入以下内容:

      [logstash-9.x]
      name=Elastic repository for 9.x packages
      baseurl=https://artifacts.elastic.co/packages/9.x/yum
      gpgcheck=1
      gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
      enabled=1
      autorefresh=1
      type=rpm-md
    3. 下载安装Logstash。

      sudo yum install logstash
  2. 安装input插件。

    1. 下载input插件。下载地址为logstash-input-sls

    2. 安装input插件。

      /usr/share/logstash/bin/logstash-plugin install logstash-input-sls.zip
      说明

      插件安装失败原因及解决方案,请参见插件安装配置

  3. 创建logstash用户,Logstash必须以非超级用户(root)运行。

    1. 创建logstash用户。

      sudo adduser --system --no-create-home --group logstash
    2. 设置logstash用户权限。确保 Logstash 相关目录(如 /usr/share/logstash/etc/logstash 和 /var/log/logstash)的所有权归 logstash 用户。

      sudo chown -R logstash:logstash /usr/share/logstash /etc/logstash /var/log/logstash
    3. 验证logstash用户是否创建成功。

      id logstash

      输出应显示 logstash 用户的 UID 和 GID,表示创建成功。

      image

  4. logstash用户启动Logstash。

    1. /etc/logstash目录下,创建.conf后缀文件,本文以该目录下源文件logstash-sample.conf 为例。

    2. logstash-sample.conf 文件中输入示例代码,以logstash用户启动Logstash。

      sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/logstash-sample.conf

      配置Logstash消费某一个Logstore并将数据打印到标准输出,示例和参数说明如下:

      重要

      强烈建议不要把AccessKey IDAccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。请配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见创建RAM用户的AccessKey

      参数说明

      示例代码

      input {
        logservice{
        endpoint => "your project endpoint"
        // 本示例从环境变量中获取AccessKey IDAccessKey Secret。
        access_id => process.env.ALIBABA_CLOUD_ACCESS_KEY_ID
        access_key => process.env.ALIBABA_CLOUD_ACCESS_KEY_SECRET
        project => "your project name"
        logstore => "your logstore name"
        consumer_group => "consumer group name"
        consumer_name => "consumer name"
        position => "end"
        checkpoint_second => 30
        include_meta => true
        consumer_name_with_ip => true
        }
      }
      
      output {
        stdout {}
      }

      endpointstring(必选)

      日志服务Project所在的Endpoint。更多信息,请参见服务接入点

      access_idstring(必选)

      阿里云AccessKey ID,需要具备消费组相关权限。更多信息,请参见指定Logstore的消费权限

      access_keystring(必选)

      阿里云AccessKey Secret,需要具备消费组相关权限。更多信息,请参见指定Logstore的消费权限

      projectstring(必选)

      日志服务Project名称。

      logstorestring(必选)

      日志服务Logstore名称。

      consumer_groupstring(必选)

      消费组名称。

      consumer_namestring(必选)

      消费者名称,同一个消费组中的消费者名称不能重复。

      positionstring(必选)

      消费开始位置。

      • begin:从Logstore写入的第一条数据开始消费。

      • end:从当前时间点开始消费。

      • yyyy-MM-dd HH:mm:ss:从指定时间点开始消费。

      checkpoint_secondnumber(可选)

      每隔几秒Checkpoint一次,建议10秒~60秒,不能低于10秒,默认为30秒。

      include_metaboolean(可选)

      数据是否包含Meta,Meta包括source、time、tag、topic,默认为true。

      consumer_name_with_ipboolean(可选)

      消费者名称是否包含IP地址,默认为true,分布式协同消费下必须设置为true。