logstash-input-sls插件是阿里云Logstash自带的默认插件。作为Logstash的input插件,logstash-input-sls插件提供了从日志服务获取日志的功能。

说明 logstash-input-sls是阿里云维护的开源插件,详情请参见logstash-input-logservice

功能特性

  • 支持分布式协同消费:可配置多台服务器同时消费一个Logstore服务。
    说明 多台Logstash服务器进行分布式协同消费时,由于logstash-input-sls插件限制,需保证各个服务器仅部署一个input sls管道。如果单个服务器中存在多个input sls管道,输出端可能会出现数据重复的异常情况。
  • 高性能:基于Java ConsumerGroup实现,单核消费速度可达20 MB/s。
  • 高可靠:消费进度会被保存到服务端,宕机恢复时,会从上一次checkpoint处自动恢复。
  • 自动负载均衡:根据消费者数量自动分配shard,消费者增加或退出后会自动进行负载均衡。

前提条件

您已完成以下操作:

使用logstash-input-sls插件

满足以上前提条件后,您可以通过配置文件管理管道的方式创建管道任务。在创建管道任务时,按照以下说明配置管道参数。配置完成后进行保存与部署,即可触发Logstash从日志服务获取日志。
注意 RAM用户使用logstash-input-sls插件前,还需要在日志服务侧设置消费组相关的权限策略,详细信息请参见RAM用户授权

以使用阿里云Logstash消费某一个Logstore,并将日志输出到阿里云Elasticsearch为例,配置示例如下。

input {
 logservice{
  endpoint => "your project endpoint"
  access_id => "your access id"
  access_key => "your access key"
  project => "your project name"
  logstore => "your logstore name"
  consumer_group => "consumer group name"
  consumer_name => "consumer name"
  position => "end"
  checkpoint_second => 30
  include_meta => true
  consumer_name_with_ip => true
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
    index => "<your_index>"
    user => "elastic"
    password => "changeme"
  }
}

假设某Logstore有10个shard,每个shard的数据流量1 MB/s;每台阿里云Logstash机器处理的能力为3 MB/s,可分配5台阿里云Logstash服务器,每个服务器创建一个input sls管道;并且每个服务器管道设置相同的consumer_groupconsumer_name,将consumer_name_with_ip字段设置为true。

这种情况每台服务器会分配到2个shard,分别处理2 MB/s的数据。

参数说明

logstash-input-sls支持的参数如下。

参数名 参数类型 是否必填 说明
endpoint String VPC网络下的日志服务项目的Endpoint,详情请参见经典网络及VPC网络服务入口
access_id String 阿里云Access Key ID,需要具备ConsumerGroup相关权限,详情请参见使用消费组消费
access_key String 阿里云Access Key Secret,需要具备ConsumerGroup相关权限,详情请参见使用消费组消费
project String 日志服务项目名。
logstore String 日志服务日志库名。
consumer_group String 自定义消费组名。
consumer_name String 自定义消费者名。同一个消费组内消费者名不能重复,否则会出现未定义行为。
position String 消费位置,可选:
  • begin:从日志库写入的第一条数据开始消费。
  • end:从当前时间点开始消费。
  • yyyy-MM-dd HH:mm:ss:从指定时间点开始消费。
checkpoint_second Number 每隔几秒checkpoint一次,建议10~60秒,不能低于10秒,默认30秒。
include_meta Boolean 传入日志是否包含Meta,Meta包括日志source、time、tag以及topic,默认为true。
consumer_name_with_ip Boolean 消费者名是否包含IP地址,默认为true。分布式协同消费下必须设置为true。

性能基准测试信息

  • 测试环境
    • 处理器:Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz,4 Core
    • 内存:8 GB
    • 环境:Linux
  • 阿里云Logstash配置
    input {
      logservice{
      endpoint => "cn-hangzhou-intranet.log.aliyuncs.com"
      access_id => "***"
      access_key => "***"
      project => "test-project"
      logstore => "logstore1"
      consumer_group => "consumer_group1"
      consumer_name => "consumer1"
      position => "end"
      checkpoint_second => 30
      include_meta => true
      consumer_name_with_ip => true
      }
    }
    output {
      elasticsearch {
        hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
        index => "myindex"
        user => "elastic"
        password => "changeme"
      }
    }
  • 测试过程
    1. 使用Java Producer向Logstore发送数据,每秒分别发送2 MB、4 MB、8 MB、16 MB、32 MB数据。

      每条日志约500字节,包括10个Key和Value对。

    2. 启动阿里云Logstash消费Logstore中的数据,并确保消费延迟没有上涨(消费速度能够跟上生产的速度)。
  • 测试结果
    流量(MB/S) CPU使用率(%) 内存占用量(GB)
    32 170.3 1.3
    16 83.3 1.3
    8 41.5 1.3
    4 21.0 1.3
    2 11.3 1.3