更新时间:2020-03-11 10:15
Logstash是一种分布式日志收集框架,非常简洁强大,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。 阿里云实时计算为了方便用户将更多数据采集进入DataHub,提供了针对Logstash的DataHub Output/Input插件。使用Logstash,您可以轻松享受到Logstash开源社区多达30+种数据源支持(file,syslog,redis,log4j,apache log或nginx log),同时Logstash还支持filter对传输字段自定义加工等功能。下面我们就以Logstash日志采集为例介绍如何使用Logstash快速将日志数据采集进入DataHub。
注: Logstash DataHub Output/Input插件遵循Apache License 2.0开源协议。
Logstash安装要求JRE 7版本及以上,否则部分功能无法使用。
阿里云提供两种LogStash的安装方式:
一键安装
一键安装包,支持将Logstash和DataHub插件一键安装,点此下载
当前我们提供了免安装的版本,解压即可使用:
$ tar -xzvf logstash-with-datahub-6.4.0-1.0.7.tar.gz
$ cd logstash-with-datahub-6.4.0
命令执行完成后,logstash即安装成功。
单独安装
安装Logstash: 参看Logstash官网提供的[安装教程]进行安装工作。特别注意的是,最新的Logstash需要Java 7及以上版本。
安装DataHub插件: 下载所需要的插件。
上传至DataHub请使用: DataHub Logstash Output插件
下载DataHub中数据请使用: DataHub Logstash Input插件
分别使用如下命令进行安装:
$ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-output-datahub-1.0.7.gem
$ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-input-datahub-1.0.7.gem
下面以Log4j日志产出为例,讲解下如何使用Logstash采集并结构化日志数据。Log4j的日志样例如下:
20:04:30.359 [qtp1453606810-20] INFO AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms
针对上述Log4j文件,我们希望将数据结构化并采集进入DataHub,其中DataHub的Topic格式如下:
字段名称 | 字段类型 |
---|---|
request_time | STRING |
thread_id | STRING |
log_level | STRING |
class_name | STRING |
request_id | STRING |
detail | STRING |
Logstash任务配置如下:
input {
file {
# windows 中也使用"/", 而非"\"
path => "${APP_HOME}/log/bayes.log"
start_position => "beginning"
}
}
filter{
grok {
match => {
"message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)"
}
}
}
output {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "project"
topic_name => "topic"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/Users/ph0ly/trash/dirty.data"
dirty_data_file_max_size => 1000
}
}
下面以标准csv文件为例,讲解下如何使用Logstash采集csv文件数据。csv文件样例如下:
1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1
针对上述csv文件,DataHub的Topic格式如下:
字段名称 | 字段类型 |
---|---|
col1 | BIGINT |
col2 | DOUBLE |
col3 | BOOLEAN |
col4 | TIMESTAMP |
col5 | STRING |
Logstash任务配置如下:
input {
file {
path => "${APP_HOME}/data.csv"
start_position => "beginning"
}
}
filter{
csv {
columns => ['col1', 'col2', 'col3', 'col4', 'col5']
}
}
output {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "project"
topic_name => "topic"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/Users/ph0ly/trash/dirty.data"
dirty_data_file_max_size => 1000
}
}
下面以json格式文件为例,讲解下如果使用Logstash采集json文件数据。json文件样例如下:
{"field1":"string val","field2":2.22,"field3":{"field4":true,"field5":123456}}
针对上述json文件,DataHub的Topic格式如下
字段名称 | 字段类型 |
---|---|
field1 | String |
field2 | DOUBLE |
field4 | BOOLEAN |
field5 | BIGINT |
这里因为field4和filed5中的数据嵌套在field3下,所以无法直接和DataHub中的schema对应,需要对在配置文件中添加filter对field3多进行一次解析。更复杂的json数据解析方式可以参考Logstash官方文档。
Logstash任务配置如下:
input {
file {
path => "${APP_HOME}/data.csv"
start_position => "beginning"
codec => json {
charset => "UTF-8"
}
}
}
#添加临时字段对field3的json数据再次进行解析
filter{
mutate {
add_field => { "temp_field" => "%{field3}" }
}
json {
source => "temp_field"
remove_field => ["temp_field", "field3"]
}
}
output {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "project"
topic_name => "topic"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/Users/ph0ly/trash/dirty.data"
dirty_data_file_max_size => 1000
}
}
使用如下命令启动Logstash:
logstash -f <上述配置文件地址>
batch_size是每次向DataHub发送的记录条数,默认为125,指定batch_size启动Logstash:
logstash -f <上述配置文件地址> -b 256
参数说明如下:
access_id(Required): 阿里云access id
access_key(Required): 阿里云access key
endpoint(Required): 阿里云datahub的服务地址
project_name(Required): datahub项目名称
topic_name(Required): datahub topic名称
retry_times(Optional): 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数, 默认值为-1
retry_interval(Optional): 下一次重试的间隔,单位为秒,默认值为5
skip_after_retry(Optional): 当由Datahub异常导致的重试超过重试次数,是否跳过这一轮上传的数据,默认为false
approximate_request_bytes(Optional): 用于限制每次发送请求的字节数,是一个近似值,防止因request body过大而被拒绝接收,默认为2048576(2MB)
shard_keys(Optional):数组类型,数据落shard的字段名称,插件会根据这些字段的值计算hash将每条数据落某个shard, 注意shard_keys和shard_id都未指定,默认轮询落shard
shard_id(Optional): 所有数据落指定的shard,注意shard_keys和shard_id都未指定,默认轮询落shard
dirty_data_continue(Optional): 脏数据是否继续运行,默认为false,如果指定true,则遇到脏数据直接无视,继续处理数据。当开启该开关,必须指定@dirty_data_file文件
dirty_data_file(Optional): 脏数据文件名称,当数据文件名称,在@dirty_data_continue开启的情况下,需要指定该值。特别注意:脏数据文件将被分割成两个部分.part1和.part2,part1作为更早的脏数据,part2作为更新的数据
dirty_data_file_max_size(Optional): 脏数据文件的最大大小,该值保证脏数据文件最大大小不超过这个值,目前该值仅是一个参考值
enable_pb(Optional): 默认使用pb传输,专有云如不支持pb,手动设置为false
logstash的配置如下:
input {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "test_project"
topic_name => "test_topic"
interval=> 5
#cursor => {
# "0"=>"20000000000000000000000003110091"
# "2"=>"20000000000000000000000003110091"
# "1"=>"20000000000000000000000003110091"
# "4"=>"20000000000000000000000003110091"
# "3"=>"20000000000000000000000003110000"
#}
shard_ids => []
pos_file => "/home/admin/logstash/pos_file"
}
}
output {
file {
path => "/home/admin/logstash/output"
}
}
access_id(Required): 阿里云access id
access_key(Required): 阿里云access key
endpoint(Required): 阿里云datahub的服务地址
project_name(Required): datahub项目名称
topic_name(Required): datahub topic名称
retry_times(Optional): 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数
retry_interval(Optional): 下一次重试的间隔,单位为秒
shard_ids(Optional):数组类型,需要消费的shard列表,空列表默认全部消费
cursor(Optional):消费起点,默认为空,表示从头开始消费
pos_file(Required):checkpoint记录文件,必须配置,优先使用checkpoint恢复消费offset
enable_pb(Optional): 默认使用pb传输,专有云如不支持pb,手动设置为false
compress_method(Optional): 网络传输的压缩算法,默认不压缩,可选项有"lz4", "deflate"
print_debug_info(Optional): 打印datahub的debug信息,默认为false关闭,打开后会打印大量信息,仅作debug使用
更多的配置参数请参考logstash官方网站,以及ELK stack中文指南
在文档使用中是否遇到以下问题
更多建议
匿名提交