全部产品
云市场

Fluentd插件

更新时间:2018-04-23 11:04:04

Fluentd采集工具


产品介绍

该插件是基于Fluentd开发的输出插件,主要是将采集到的数据写入DataHub。该插件遵守Fluentd输出插件开发规范,安装方便,可以很方便地将采集得到的数据写到DataHub。

产品安装

通过Ruby gem安装

注意:RubyGem源建议更改为https://gems.ruby-china.org/

  1. gem install fluent-plugin-datahub

本地安装

  1. 当前Fluentd仅支持Linux环境, 同时要求用户安装Ruby。

  2. 目前支持两种安装模式,对于没有安装过fluentd的客户我们提供了一键fluent+datahub全部安装模式,对于曾经安装了fluentd的客户我们提供datahub写入插件单独安装模式

1) 一键安装: 如果之前没安装过fluentd,请点击下载Fluentd完全安装包。注意,完全安装包提供的fluentd是fluentd-0.12.25.gem的版本

  1. $ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
  2. $ cd fluentd-with-datahub
  3. $ sudo sh install.sh

2) 单独安装 如果之前安装过fluentd,请点此下载fluentd datahub插件包, 使用gem命令安装datahub插件。

  1. $ sudo gem install --local fluent-plugin-datahub-0.12.25.gem

使用案例

案例一: CSV文件上传

下面以增量的CSV文件为例,说明下如何使用Fluentd将增量的CSV文件准实时上传到DataHub数据。CSV文件的格式如下图所示:

  1. 0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
  2. 1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
  3. 2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
  4. 3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
  5. 4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
  6. 5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
  7. 6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
  8. 7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
  9. 8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
  10. 9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
  11. 10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

上述CSV文件中每行一条Record,按照(,)区分字段。保存在本地路径/temp/test.csv中。DataHub Topic格式如下:

字段名称 字段类型
id BIGINT
name STRING
gender BOOLEAN
salary DOUBLE
my_time TIMESTAMP

使用如下Fluentd的配置,配置文件地址在 ${CONFIG_HOME}/fluentd_test.conf:

  1. <source>
  2. @type tail
  3. path 你的文件路径
  4. tag test1
  5. format csv
  6. keys id,name,gender,salary,my_time
  7. </source>
  8. <match test1>
  9. @type datahub
  10. access_id your_app_id
  11. access_key your_app_key
  12. endpoint http://ip:port
  13. project_name test_project
  14. topic_name fluentd_performance_test_1
  15. column_names ["id", "name", "gender", "salary", "my_time"]
  16. flush_interval 1s
  17. buffer_chunk_limit 3m
  18. buffer_queue_limit 128
  19. dirty_data_continue true
  20. dirty_data_file 脏数据记录文件路径
  21. retry_times 3
  22. put_data_batch_size 1000
  23. </match>

使用如下命令启动Fluentd,即可完成CSV文件数据采集进入DataHub:

  1. ${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf

案例二: Log4J日志采集

Log4j的日志格式如下:

  1. 11:48:43.439 [qtp1847995714-17] INFO AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms

使用如下Fluentd配置:

  1. <source>
  2. @type tail
  3. path bayes.log
  4. tag test
  5. format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
  6. </source>
  7. <match test>
  8. @type datahub
  9. access_id your_access_id
  10. access_key your_access_key
  11. endpoint http://ip:port
  12. project_name test_project
  13. topic_name dataHub_fluentd_out_1
  14. column_names ["thread_id", "log_level", "class"]
  15. </match>

使用该配置启动即可完成log4j日志采集进入DataHub的功能。

配置参数

  1. 读插件配置
  2. tag test1 : 指定路由, 和<match>会进行路由正则匹配
  3. format csv : 数据按照csv方式采集
  4. keys id,name,gender,salary,my_time : 指定需要采集的列名,必须和目的DataHub表的列名一致
  1. 写插件配置
  2. shard_id 0 : 指定shard_id写入,默认round-robin方式写入
  3. shard_keys ["id"] : 指定用作分区key,用keyhash后作为写入shard的索引
  4. flush_interval 1 : fluentd 每一秒钟至少写一次, 默认60s
  5. buffer_chunk_limit 3m : 块大小,支持“k”(KB),“m”(MB)单位,建议值3m
  6. buffer_queue_limit 128 : 块队列大小,此值与buffer_chunk_limit共同决定整个缓冲区大小
  7. put_data_batch_size 1000 : 1000record写一次DataHub
  8. retry_times 3 : 重试次数
  9. retry_interval 3 : 重试间隔(单位:s)
  10. dirty_data_continue true : 遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件
  11. dirty_data_file /xxx/yyy : 指定脏数据文件的位置
  12. column_names ["id"] : 指定需要采集的列

性能测试

性能测试环境: Fluentd运行环境为2核4G,操作系统为Linux;性能结果如下:

collect-data-fluentd-test

从本次DataHub插件性能测试数据中可以看到:

  1. 针对单条512B的数据,写入速度极本保持在 2800record/s 左右

  2. 随着put_data_batch_size的增加速度略有提升,但效果不大。

  3. 对于单条100K的数据, put_data_batch_size只有100的时候可以正常work,500和1000都不可用: 因为一次写入数据过大,已经大于50m

  4. 总的平均写入速度(MB/S)保持在 3MB/S

FAQ

Q: 关于Fluentd,如何编写Format的正则表达式?

A: format正则表达式编辑器