全部产品
云市场
云游戏

LogStash插件

更新时间:2020-08-03 17:14:27

Logstash插件

Logstash采集工具


Logstash是一种分布式日志收集框架,非常简洁强大,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。 阿里云实时计算为了方便用户将更多数据采集进入DataHub,提供了针对Logstash的DataHub Output/Input插件。使用Logstash,您可以轻松享受到Logstash开源社区多达30+种数据源支持(file,syslog,redis,log4j,apache log或nginx log),同时Logstash还支持filter对传输字段自定义加工等功能。下面我们就以Logstash日志采集为例介绍如何使用Logstash快速将日志数据采集进入DataHub。

注: Logstash DataHub Output/Input插件遵循Apache License 2.0开源协议。

安装

安装限制

Logstash安装要求JRE 7版本及以上,否则部分功能无法使用。

如何安装

阿里云提供两种LogStash的安装方式:

  • 一键安装

    一键安装包,支持将Logstash和DataHub插件一键安装,点此下载

    当前我们提供了免安装的版本,解压即可使用:

    1. $ tar -xzvf logstash-with-datahub-6.4.0-1.0.8.tar.gz
    2. $ cd logstash-with-datahub-6.4.0

    命令执行完成后,logstash即安装成功。

  • 单独安装

    安装Logstash: 参看Logstash官网提供的[安装教程]进行安装工作。特别注意的是,最新的Logstash需要Java 7及以上版本。

    安装DataHub插件: 下载所需要的插件。

    上传至DataHub请使用: DataHub Logstash Output插件

    下载DataHub中数据请使用: DataHub Logstash Input插件

    分别使用如下命令进行安装:

    1. $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-output-datahub-1.0.8.gem
    2. $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-input-datahub-1.0.8.gem

    如果安装时遇到类似如下错误:

    1. WARNING: can not set Session#timeout=(0) no session context

    先确认机器是否能够访问外网。网络能通的情况下,可以尝试修改为国内的镜像源。 https://gems.ruby-china.com/

  • 离线安装

如果线上机器无法访问外网,那么无法通过以上方式安装。需要用户自行制作离线安装包,因为离线安装包是和logstash版本绑定的。先在测试机器上,按上述方式成功安装插件后,参考 官方文档 制作离线安装包,再上传到线上机器进行离线安装。

使用DataHub Logstash Output插件上传数据

配置Log4j日志到DataHub

下面以Log4j日志产出为例,讲解下如何使用Logstash采集并结构化日志数据。Log4j的日志样例如下:

  1. 20:04:30.359 [qtp1453606810-20] INFO AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

针对上述Log4j文件,我们希望将数据结构化并采集进入DataHub,其中DataHub的Topic格式如下:

字段名称 字段类型
request_time STRING
thread_id STRING
log_level STRING
class_name STRING
request_id STRING
detail STRING

Logstash任务配置如下:

  1. input {
  2. file {
  3. # windows 中也使用"/", 而非"\"
  4. path => "${APP_HOME}/log/bayes.log"
  5. start_position => "beginning"
  6. }
  7. }
  8. filter{
  9. grok {
  10. match => {
  11. "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)"
  12. }
  13. }
  14. }
  15. output {
  16. datahub {
  17. access_id => "Your accessId"
  18. access_key => "Your accessKey"
  19. endpoint => "Endpoint"
  20. project_name => "project"
  21. topic_name => "topic"
  22. #shard_id => "0"
  23. #shard_keys => ["thread_id"]
  24. dirty_data_continue => true
  25. dirty_data_file => "/Users/ph0ly/trash/dirty.data"
  26. dirty_data_file_max_size => 1000
  27. }
  28. }

配置csv文件到DataHub:

下面以标准csv文件为例,讲解下如何使用Logstash采集csv文件数据。csv文件样例如下:

  1. 1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
  2. 2222,2.23456789012E9,false,14321111111000000,string_dataxxx1

针对上述csv文件,DataHub的Topic格式如下:

字段名称 字段类型
col1 BIGINT
col2 DOUBLE
col3 BOOLEAN
col4 TIMESTAMP
col5 STRING

Logstash任务配置如下:

  1. input {
  2. file {
  3. path => "${APP_HOME}/data.csv"
  4. start_position => "beginning"
  5. }
  6. }
  7. filter{
  8. csv {
  9. columns => ['col1', 'col2', 'col3', 'col4', 'col5']
  10. }
  11. }
  12. output {
  13. datahub {
  14. access_id => "Your accessId"
  15. access_key => "Your accessKey"
  16. endpoint => "Endpoint"
  17. project_name => "project"
  18. topic_name => "topic"
  19. #shard_id => "0"
  20. #shard_keys => ["thread_id"]
  21. dirty_data_continue => true
  22. dirty_data_file => "/Users/ph0ly/trash/dirty.data"
  23. dirty_data_file_max_size => 1000
  24. }
  25. }

配置json文件到DataHub:

一对一同步(一个json文件同步到一个topic)

json文件样例(放到/home/software/data/22/test_topic_22.json)

  1. [{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
  2. [{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]

配置文件

  1. input {
  2. file {
  3. path => "/home/software/data/22/test_topic_22.json"
  4. start_position => "beginning"
  5. #点位存储位置
  6. sincedb_path => "/home/software/sincedb/filedatahub"
  7. codec => json {
  8. charset => "UTF-8"
  9. }
  10. }
  11. }
  12. filter{
  13. mutate {
  14. rename => {
  15. "col1" => "col1"
  16. "col2" => "col2"
  17. }
  18. }
  19. }
  20. output {
  21. datahub {
  22. access_id => "xxxx"
  23. access_key => "xxxx"
  24. endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
  25. project_name => "test_dh1"
  26. topic_name => "topic_test_2"
  27. #shard_id => "0"
  28. #shard_keys => ["thread_id"]
  29. dirty_data_continue => true
  30. dirty_data_file => "/home/software/dirty_vehIdInfo.data"
  31. dirty_data_file_max_size => 1000
  32. }
  33. }

运行命令及结果

  1. bash bin/logstash -f config/logstash.conf

image.png

多对多同步(多个json文件同步到多个topic)

json文件样例(放到/home/software/data/22/test_topic_22.json)

  1. [{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
  2. [{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]

json文件样例(放到/home/software/data/11/test_topic.json)

  1. [{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
  2. [{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
  3. [{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]

配置文件

  1. input {
  2. file {
  3. path => "/home/software/data/**/*.json"
  4. start_position => "beginning"
  5. #点位存储位置
  6. sincedb_path => "/home/software/sincedb/filedatahub"
  7. codec => json {
  8. charset => "UTF-8"
  9. }
  10. }
  11. }
  12. filter{
  13. #用path来区分
  14. if "22" in [path] {
  15. mutate {
  16. rename => {
  17. "col1" => "col1"
  18. "col2" => "col2"
  19. }
  20. }
  21. }
  22. if "11" in [path] {
  23. mutate {
  24. rename => {
  25. "col1" => "col1"
  26. "col2" => "col2"
  27. }
  28. }
  29. }
  30. }
  31. output {
  32. if "22" in [path] {
  33. datahub {
  34. access_id => "xxxxx"
  35. access_key => "xxxxx"
  36. endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
  37. project_name => "test_dh1"
  38. topic_name => "topic_test_2"
  39. #shard_id => "0"
  40. #shard_keys => ["thread_id"]
  41. dirty_data_continue => true
  42. dirty_data_file => "/home/software/dirty_vehIdInfo.data"
  43. dirty_data_file_max_size => 1000
  44. }
  45. }
  46. if "11" in [path] {
  47. datahub {
  48. access_id => "xxxxx"
  49. access_key => "xxxxx"
  50. endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
  51. project_name => "test_dh1"
  52. topic_name => "topic_test"
  53. #shard_id => "0"
  54. #shard_keys => ["thread_id"]
  55. dirty_data_continue => true
  56. dirty_data_file => "/home/software/dirty.data"
  57. dirty_data_file_max_size => 1000
  58. }
  59. }
  60. }

运行命令及结果

  1. bash bin/logstash -f config/logstash.conf

image.png

一对多同步(一个json文件根据某个key来路由不同的topic)

  1. [{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
  2. [{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
  3. [{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]
  4. [{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]

配置文件如下:

  1. input {
  2. file {
  3. path => "/home/software/data/22/test_topic_22.json"
  4. start_position => "beginning"
  5. sincedb_path => "/home/software/sincedb/filedatahub"
  6. codec => json {
  7. charset => "UTF-8"
  8. }
  9. }
  10. }
  11. filter{
  12. mutate {
  13. rename => {
  14. "col1" => "col1"
  15. "col2" => "col2"
  16. }
  17. }
  18. }
  19. output {
  20. # 根据col1的值来路由到不同的topic
  21. if[col1] == "aaaaa" {
  22. datahub {
  23. access_id => "xxxxx"
  24. access_key => "xxxxx"
  25. endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
  26. project_name => "test_dh1"
  27. topic_name => "topic_test_2"
  28. #shard_id => "0"
  29. #shard_keys => ["thread_id"]
  30. dirty_data_continue => true
  31. dirty_data_file => "/home/software/dirty_vehIdInfo.data"
  32. dirty_data_file_max_size => 1000
  33. }
  34. }
  35. else {
  36. datahub {
  37. access_id => "xxxxx"
  38. access_key => "xxxxx"
  39. endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
  40. project_name => "test_dh1"
  41. topic_name => "topic_test"
  42. #shard_id => "0"
  43. #shard_keys => ["thread_id"]
  44. dirty_data_continue => true
  45. dirty_data_file => "/home/software/dirty_vehIdInfo.data"
  46. dirty_data_file_max_size => 1000
  47. }
  48. }
  49. }

运行命令及结果

  1. bash bin/logstash -f config/logstash.conf

image.png

同步Mysql数据到logstash

mysql表数据

配置文件如下:

  1. input {
  2. jdbc {
  3. jdbc_driver_library => "mysql-connector-java-5.1.47.jar"
  4. jdbc_driver_class => "com.mysql.jdbc.Driver"
  5. jdbc_connection_string => "jdbc:mysql://xxxx:3306/databaese"
  6. jdbc_user => "username"
  7. jdbc_password => "password"
  8. use_column_value => true
  9. tracking_column => "modifytime"
  10. schedule => "* * * * *"
  11. statement => "select * from suyang_test"
  12. }
  13. }
  14. output {
  15. datahub {
  16. access_id => "Your accessId"
  17. access_key => "Your accessKey"
  18. endpoint => "Endpoint"
  19. project_name => "project"
  20. topic_name => "topic"
  21. #shard_id => "0"
  22. #shard_keys => ["thread_id"]
  23. dirty_data_continue => true
  24. dirty_data_file => "/Users/ph0ly/trash/dirty.data"
  25. dirty_data_file_max_size => 1000
  26. }
  27. }

运行Logstash

使用如下命令启动Logstash:

  1. logstash -f <上述配置文件地址>

batch_size是每次向DataHub发送的记录条数,默认为125,指定batch_size启动Logstash:

  1. logstash -f <上述配置文件地址> -b 256

参数

参数说明如下:

  1. access_id(Required): 阿里云access id
  2. access_key(Required): 阿里云access key
  3. endpoint(Required): 阿里云datahub的服务地址
  4. project_name(Required): datahub项目名称
  5. topic_name(Required): datahub topic名称
  6. retry_times(Optional): 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数, 默认值为-1
  7. retry_interval(Optional): 下一次重试的间隔,单位为秒,默认值为5
  8. skip_after_retry(Optional): 当由Datahub异常导致的重试超过重试次数,是否跳过这一轮上传的数据,默认为false
  9. approximate_request_bytes(Optional): 用于限制每次发送请求的字节数,是一个近似值,防止因request body过大而被拒绝接收,默认为2048576(2MB)
  10. shard_keys(Optional):数组类型,数据落shard的字段名称,插件会根据这些字段的值计算hash将每条数据落某个shard, 注意shard_keysshard_id都未指定,默认轮询落shard
  11. shard_id(Optional): 所有数据落指定的shard,注意shard_keysshard_id都未指定,默认轮询落shard
  12. dirty_data_continue(Optional): 脏数据是否继续运行,默认为false,如果指定true,则遇到脏数据直接无视,继续处理数据。当开启该开关,必须指定@dirty_data_file文件
  13. dirty_data_file(Optional): 脏数据文件名称,当数据文件名称,在@dirty_data_continue开启的情况下,需要指定该值。特别注意:脏数据文件将被分割成两个部分.part1和.part2part1作为更早的脏数据,part2作为更新的数据
  14. dirty_data_file_max_size(Optional): 脏数据文件的最大大小,该值保证脏数据文件最大大小不超过这个值,目前该值仅是一个参考值
  15. enable_pb(Optional): 默认使用pb传输,专有云如不支持pb,手动设置为false

使用DataHub Logstash Input插件读取DataHub中数据

消费DataHub数据写入文件

logstash的配置如下:

  1. input {
  2. datahub {
  3. access_id => "Your accessId"
  4. access_key => "Your accessKey"
  5. endpoint => "Endpoint"
  6. project_name => "test_project"
  7. topic_name => "test_topic"
  8. interval=> 5
  9. #cursor => {
  10. # "0"=>"20000000000000000000000003110091"
  11. # "2"=>"20000000000000000000000003110091"
  12. # "1"=>"20000000000000000000000003110091"
  13. # "4"=>"20000000000000000000000003110091"
  14. # "3"=>"20000000000000000000000003110000"
  15. #}
  16. shard_ids => []
  17. pos_file => "/home/admin/logstash/pos_file"
  18. }
  19. }
  20. output {
  21. file {
  22. path => "/home/admin/logstash/output"
  23. }
  24. }

参数介绍

  1. access_id(Required): 阿里云access id
  2. access_key(Required): 阿里云access key
  3. endpoint(Required): 阿里云datahub的服务地址
  4. project_name(Required): datahub项目名称
  5. topic_name(Required): datahub topic名称
  6. retry_times(Optional): 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数
  7. retry_interval(Optional): 下一次重试的间隔,单位为秒
  8. shard_ids(Optional):数组类型,需要消费的shard列表,空列表默认全部消费
  9. cursor(Optional):消费起点,默认为空,表示从头开始消费
  10. pos_file(Required):checkpoint记录文件,必须配置,优先使用checkpoint恢复消费offset
  11. enable_pb(Optional): 默认使用pb传输,专有云如不支持pb,手动设置为false
  12. compress_method(Optional): 网络传输的压缩算法,默认不压缩,可选项有"lz4", "deflate"
  13. print_debug_info(Optional): 打印datahubdebug信息,默认为false关闭,打开后会打印大量信息,仅作debug使用

更多

更多的配置参数请参考logstash官方网站,以及ELK stack中文指南