文档

Logstash消费

更新时间:

日志服务支持通过Logstash消费数据,您可以通过配置日志服务的Input插件对接Logstash获取日志服务中的数据并写入到其他系统中,例如Kafka、HDFS等。

功能特性

  • 分布式协同消费:可配置多台服务器同时消费某一个Logstore。

  • 高性能:基于Java ConsumerGroup实现,单核消费速度可达20 MB/s(压缩前)。

  • 高可靠性:消费进度保存到服务端,异常恢复后会从上一次消费的Checkpoint处自动恢复消费。

  • 自动负载均衡:根据消费者数量自动分配Shard,消费者增加或减少后会自动负载均衡。

操作步骤

  1. 安装Logstash。

    1. 下载安装包

    2. 解压安装包到指定目录。

  2. 安装input插件。

    1. 下载input插件。下载地址为logstash-input-sls

    2. 安装input插件。

      logstash-plugin install logstash-input-sls
      说明

      插件安装失败原因及解决方案,请参见插件安装配置

  3. 启动Logstash。

    logstash -f logstash.conf

    配置参数如下表所示。

    参数

    类型

    是否必须

    说明

    endpoint

    string

    日志服务Project所在的Endpoint。更多信息,请参见服务接入点

    access_id

    string

    阿里云AccessKey ID,需要具备消费组相关权限。更多信息,请参见指定Logstore的消费权限

    access_key

    string

    阿里云AccessKey Secret,需要具备消费组相关权限。更多信息,请参见指定Logstore的消费权限

    project

    string

    日志服务Project名称。

    logstore

    string

    日志服务Logstore名称。

    consumer_group

    string

    消费组名称。

    consumer_name

    string

    消费者名称,同一个消费组中的消费者名称不能重复。

    position

    string

    消费开始位置。

    • begin:从Logstore写入的第一条数据开始消费。

    • end:从当前时间点开始消费。

    • yyyy-MM-dd HH:mm:ss:从指定时间点开始消费。

    checkpoint_second

    number

    每隔几秒Checkpoint一次,建议10秒~60秒,不能低于10秒,默认为30秒。

    include_meta

    boolean

    数据是否包含Meta,Meta包括source、time、tag、topic,默认为true。

    consumer_name_with_ip

    boolean

    消费者名称是否包含IP地址,默认为true,分布式协同消费下必须设置为true。

示例

配置Logstash消费某一个Logstore并将数据打印到标准输出,示例如下:

重要

强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。请配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量

input {
  logservice{
  endpoint => "your project endpoint"
  // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
  access_id => process.env.ALIBABA_CLOUD_ACCESS_KEY_ID
  access_key => process.env.ALIBABA_CLOUD_ACCESS_KEY_SECRET
  project => "your project name"
  logstore => "your logstore name"
  consumer_group => "consumer group name"
  consumer_name => "consumer name"
  position => "end"
  checkpoint_second => 30
  include_meta => true
  consumer_name_with_ip => true
  }
}

output {
  stdout {}
}