全部产品
云市场

Logstash插件

更新时间:2019-09-19 22:39:51

Logstash采集工具


Logstash是一种分布式日志收集框架,非常简洁强大,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。 阿里云实时计算为了方便用户将更多数据采集进入DataHub,提供了针对Logstash的DataHub Output/Input插件。使用Logstash,您可以轻松享受到Logstash开源社区多达30+种数据源支持(file,syslog,redis,log4j,apache log或nginx log),同时Logstash还支持filter对传输字段自定义加工等功能。下面我们就以Logstash日志采集为例介绍如何使用Logstash快速将日志数据采集进入DataHub。

注: Logstash DataHub Output/Input插件遵循Apache License 2.0开源协议。

安装

安装限制

Logstash安装要求JRE 7版本及以上,否则部分功能无法使用。

如何安装

阿里云提供两种LogStash的安装方式:

  • 一键安装

    一键安装包,支持将Logstash和DataHub插件一键安装,点此下载

    当前我们提供了免安装的版本,解压即可使用:

    1. $ tar -xzvf logstash-with-datahub-6.4.0-1.0.6.tar.gz
    2. $ cd logstash-with-datahub-6.4.0

    命令执行完成后,logstash即安装成功。

  • 单独安装

    安装Logstash: 参看Logstash官网提供的[安装教程]进行安装工作。特别注意的是,最新的Logstash需要Java 7及以上版本。

    安装DataHub插件: 下载所需要的插件。

    上传至DataHub请使用: DataHub Logstash Output插件

    下载DataHub中数据请使用: DataHub Logstash Input插件

    分别使用如下命令进行安装:

    1. $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-output-datahub-1.0.3.gem
    2. $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-input-datahub-1.0.3.gem

使用DataHub Logstash Output插件上传数据

配置Log4j日志到DataHub

下面以Log4j日志产出为例,讲解下如何使用Logstash采集并结构化日志数据。Log4j的日志样例如下:

  1. 20:04:30.359 [qtp1453606810-20] INFO AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

针对上述Log4j文件,我们希望将数据结构化并采集进入DataHub,其中DataHub的Topic格式如下:

字段名称 字段类型
request_time STRING
thread_id STRING
log_level STRING
class_name STRING
request_id STRING
detail STRING

Logstash任务配置如下:

  1. input {
  2. file {
  3. path => "${APP_HOME}/log/bayes.log"
  4. start_position => "beginning"
  5. }
  6. }
  7. filter{
  8. grok {
  9. match => {
  10. "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)"
  11. }
  12. }
  13. }
  14. output {
  15. datahub {
  16. access_id => "Your accessId"
  17. access_key => "Your accessKey"
  18. endpoint => "Endpoint"
  19. project_name => "project"
  20. topic_name => "topic"
  21. #shard_id => "0"
  22. #shard_keys => ["thread_id"]
  23. dirty_data_continue => true
  24. dirty_data_file => "/Users/ph0ly/trash/dirty.data"
  25. dirty_data_file_max_size => 1000
  26. }
  27. }

配置csv文件到DataHub:

下面以标准csv文件为例,讲解下如何使用Logstash采集csv文件数据。csv文件样例如下:

  1. 1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
  2. 2222,2.23456789012E9,false,14321111111000000,string_dataxxx1

针对上述csv文件,DataHub的Topic格式如下:

字段名称 字段类型
col1 BIGINT
col2 DOUBLE
col3 BOOLEAN
col4 TIMESTAMP
col5 STRING

Logstash任务配置如下:

  1. input {
  2. file {
  3. path => "${APP_HOME}/data.csv"
  4. start_position => "beginning"
  5. }
  6. }
  7. filter{
  8. csv {
  9. columns => ['col1', 'col2', 'col3', 'col4', 'col5']
  10. }
  11. }
  12. output {
  13. datahub {
  14. access_id => "Your accessId"
  15. access_key => "Your accessKey"
  16. endpoint => "Endpoint"
  17. project_name => "project"
  18. topic_name => "topic"
  19. #shard_id => "0"
  20. #shard_keys => ["thread_id"]
  21. dirty_data_continue => true
  22. dirty_data_file => "/Users/ph0ly/trash/dirty.data"
  23. dirty_data_file_max_size => 1000
  24. }
  25. }

运行Logstash

使用如下命令启动Logstash:

  1. logstash -f <上述配置文件地址>

batch_size是每次向DataHub发送的记录条数,默认为125,指定batch_size启动Logstash:

  1. logstash -f <上述配置文件地址> -b 256

参数

参数说明如下:

  1. access_id(Required): 阿里云access id
  2. access_key(Required): 阿里云access key
  3. endpoint(Required): 阿里云datahub的服务地址
  4. project_name(Required): datahub项目名称
  5. topic_name(Required): datahub topic名称
  6. retry_times(Optional): 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数, 默认值为-1
  7. retry_interval(Optional): 下一次重试的间隔,单位为秒,默认值为5
  8. skip_after_retry(Optional): 当由Datahub异常导致的重试超过重试次数,是否跳过这一轮上传的数据,默认为false
  9. approximate_request_bytes(Optional): 用于限制每次发送请求的字节数,是一个近似值,防止因request body过大而被拒绝接收,默认为2048576(2MB)
  10. shard_keys(Optional):数组类型,数据落shard的字段名称,插件会根据这些字段的值计算hash将每条数据落某个shard, 注意shard_keysshard_id都未指定,默认轮询落shard
  11. shard_id(Optional): 所有数据落指定的shard,注意shard_keysshard_id都未指定,默认轮询落shard
  12. dirty_data_continue(Optional): 脏数据是否继续运行,默认为false,如果指定true,则遇到脏数据直接无视,继续处理数据。当开启该开关,必须指定@dirty_data_file文件
  13. dirty_data_file(Optional): 脏数据文件名称,当数据文件名称,在@dirty_data_continue开启的情况下,需要指定该值。特别注意:脏数据文件将被分割成两个部分.part1和.part2part1作为更早的脏数据,part2作为更新的数据
  14. dirty_data_file_max_size(Optional): 脏数据文件的最大大小,该值保证脏数据文件最大大小不超过这个值,目前该值仅是一个参考值
  15. enable_pb(Optional): 默认使用pb传输,专有云如不支持pb,手动设置为false

使用DataHub Logstash Input插件读取DataHub中数据

消费DataHub数据写入文件

logstash的配置如下:

  1. input {
  2. datahub {
  3. access_id => "Your accessId"
  4. access_key => "Your accessKey"
  5. endpoint => "Endpoint"
  6. project_name => "test_project"
  7. topic_name => "test_topic"
  8. interval=> 5
  9. #cursor => {
  10. # "0"=>"20000000000000000000000003110091"
  11. # "2"=>"20000000000000000000000003110091"
  12. # "1"=>"20000000000000000000000003110091"
  13. # "4"=>"20000000000000000000000003110091"
  14. # "3"=>"20000000000000000000000003110000"
  15. #}
  16. shard_ids => []
  17. pos_file => "/home/admin/logstash/pos_file"
  18. }
  19. }
  20. output {
  21. file {
  22. path => "/home/admin/logstash/output"
  23. }
  24. }

参数介绍

  1. access_id(Required): 阿里云access id
  2. access_key(Required): 阿里云access key
  3. endpoint(Required): 阿里云datahub的服务地址
  4. project_name(Required): datahub项目名称
  5. topic_name(Required): datahub topic名称
  6. retry_times(Optional): 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数
  7. retry_interval(Optional): 下一次重试的间隔,单位为秒
  8. shard_ids(Optional):数组类型,需要消费的shard列表,空列表默认全部消费
  9. cursor(Optional):消费起点,默认为空,表示从头开始消费
  10. pos_file(Required):checkpoint记录文件,必须配置,优先使用checkpoint恢复消费offset
  11. enable_pb(Optional): 默认使用pb传输,专有云如不支持pb,手动设置为false

更多

更多的配置参数请参考logstash官方网站,以及ELK stack中文指南