MaxCompute支持将开源Logstash收集的日志数据写入MaxCompute。您可以通过Logstash的输出插件logstash-output-maxcompute,将Logstash收集的日志数据使用MaxCompute流式数据通道(Streaming Tunnel)功能上传到MaxCompute。
前提条件
在执行操作前请确认您已完成如下操作:
已安装Logstash并创建Logstash日志收集实例。更多信息,请参见Getting Started with Logstash。
背景信息
Logstash是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据并转换数据,然后将转换后的数据发送到目标“存储端”。您需要通过Logstash的logstash-output-maxcompute插件,将Logstash收集的日志数据使用MaxCompute流式数据通道(Streaming Tunnel)功能上传到MaxCompute。
logstash-output-maxcompute插件基于Logstash v7.8.0版本开发,可以作为输出端口。该插件的特点如下:
使用流式数据通道,避免通过批量数据通道导入产生的并发和小文件问题。
支持动态分区,可以根据Logstash解析的日志字段产生分区字段,能够自动创建不存在的分区。
logstash-output-maxcompute插件应用于如下场景:
需要收集的应用的日志格式在Logstash上有输入插件支持或易于解析,例如NGINX日志。
希望根据日志内容自动创建并导入对应分区。
logstash-output-maxcompute插件支持的数据类型为:STRING、BIGINT、DOUBLE、DATETIME和BOOLEAN。
日志中DATETIME类型的字段的格式将自动使用
ruby Time.parse函数推断。如果日志BOOLEAN字段满足
.to_string().lowercase() == "true",则结果为True。其他任何值为False。
本文将以收集NGINX日志为例,介绍如何配置和使用插件。
步骤一:下载并安装插件
您可以下载已安装logstash-output-maxcompute插件的Logstash实例Logstash实例下载链接,跳过安装步骤执行下一步。如果需要自行安装,请按照如下步骤操作:
下载
logstash-output-maxcompute插件(logstash-output-maxcompute插件下载链接)并放置在Logstash的根目录%logstash%下。修改Logstash根目录
%logstash%下的Gemfile文件,将source "https://rubygems.org"替换为source 'https://gems.ruby-china.com'。以Windows系统为例,在系统的命令行窗口,切换至Logstash的根目录
%logstash%下,执行如下命令安装logstash-output-maxcompute插件。bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gem当返回
Installation successful提示信息时,表示插件安装成功。D:\logstash> D:\logstash>bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gem Validating logstash-output-maxcompute-1.1.0.gem Installing logstash-output-maxcompute Installation successful可选:运行如下命令验证安装结果。
bin\logstash-plugin list maxcompute说明Linux系统需要执行命令
bin/logstash-plugin list maxcompute。如果安装成功,会返回
logstash-output-maxcompute信息。如果安装失败,解决方案请参见RubyGems。D:\logstash>bin\logstash-plugin list maxcompute logstash-output-maxcompute
步骤二:创建目标表
通过MaxCompute客户端或其他可以运行MaxCompute SQL的工具执行如下命令,在目标MaxCompute项目中创建目标表,例如logstash_test_groknginx。后续会将日志信息以日期为分区导入此表中。
create table logstash_test_groknginx(
clientip string,
remote_user string,
time datetime,
verb string,
uri string,
version string,
response string,
body_bytes bigint,
referrer string,
agent string
) partitioned by (pt string);步骤三:编写Logstash Pipeline配置文件
在Logstash的根目录%logstash%下创建配置文件pipeline.conf,并输入如下内容:
input { stdin {} }
filter {
grok {
match => {
"message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
}
}
date {
match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "timestamp"
}
}
output {
maxctunnel {
aliyun_access_id => "<your_accesskey_id>"
aliyun_access_key => "<your_accesskey_secret>"
aliyun_mc_endpoint => "<your_project_endpoint>"
project => "<your_project_name>"
table => "<table_name>"
partition => "pt=$<timestamp.strftime('%F')>"
value_fields => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
}
}参数 | 说明 |
your_accesskey_id | 可以访问目标MaxCompute项目的AccessKey ID。 |
your_accesskey_secret | AccessKey ID对应的AccessKey Secret。 |
your_project_endpoint | 目标MaxCompute项目所在区域的Endpoint信息。更多Endpoint信息,请参见Endpoint。 |
your_project_name | 目标MaxCompute项目的名称。 |
table_name | 目标表的名称,即步骤二中创建的表。 |
partition | 配置插件如何根据日志字段生成对应的分区信息。如果目标表有多个分区,需要指定到最后一级。配置格式如下:
|
partition_time_format | 可选。指定当一个字符串型的日期时间字段被分区信息引用时,该字段的源格式字符串。 在本例中,时间字段 即使未使用 如果不使用
|
value_fields | 指定目标表中的每个字段对应的日志字段,指定顺序与表中字段的顺序一致。 目标表字段的顺序为 |
aliyun_mc_tunnel_endpoint | 可选。您可以通过此配置项强制指定Tunnel Endpoint,覆盖自动路由机制。 |
retry_time | 失败重试次数。当写入MaxCompute失败时,尝试重新写入的次数。默认值为3。 |
retry_interval | 失败重试间隔。在两次尝试之间最少间隔的时间,单位为秒。默认值为1。 |
batch_size | 一次最多处理的日志条数。默认值为100。 |
batch_timeout | 写入MaxCompute的超时时间,单位为秒。默认值为5。 |
在本配置文件中,指定的日志输入为标准输入(input { stdin {} })。在实际应用场景中,您可以使用Logstash File输入插件从本地硬盘中自动读取NGINX日志。更多信息,请参见Logstash文档。
步骤四:运行和测试
以Windows系统为例,在系统的命令行窗口,切换至Logstash的根目录
%logstash%下,执行如下命令启动Logstash。bin\logstash -f pipeline.conf返回
Successfully started Logstash API endpoint信息时,Logstash启动完毕。命令执行成功后,返回如下类似信息,表示 Logstash 成功启动,API 端口为 9600。D:\logstash>bin\logstash -f pipeline.conf Sending Logstash logs to D:/logstash/logs which is now configured via log4j2.properties [2021-01-27T17:46:34,347][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2021-01-27T17:46:34,679][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.8.0", "jruby.version"=>"jruby 9.2.11.1 (2.5.7) 2020-03-25 b1f55b1a40 Java HotSpot(TM) 64-Bit Server VM 25.131-b11 on 1.8.0_131-b11 +indy +jit [mswin32-x86_64]"} [2021-01-27T17:46:40,972][INFO ][org.reflections.Reflections] Reflections took 241 ms to scan 1 urls, producing 21 keys and 41 values [2021-01-27T17:46:44,030][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["D:/logstash/pipeline.conf"], :thread=>"#<Thread:0x37536680 run>"} [2021-01-27T17:46:46,947][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} The stdin plugin is now waiting for input: [2021-01-27T17:46:47,174][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021-01-27T17:46:48,185][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}在系统的命令行窗口,粘贴如下日志样例,并按下键盘上的Enter键。
1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071 2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072执行成功后,终端返回
write .. records on partition .. completed,表示 Logstash 成功启动并将 Nginx 日志写入 MaxCompute 表。[2021-01-27T18:00:47,196][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} The stdin plugin is now waiting for input: [2021-01-27T18:00:47,388][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021-01-27T18:00:48,323][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} 1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071 2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072 [2021-01-27T18:00:56,757][INFO ][logstash.outputs.maxtunnel][main][691d6be762308ae955883c0cf0719e0a71754924lbe162d0d309f79690c9e2448] write 1 records on table doc_test_dev.logstash_test_groknginx partition pt='2020-07-08' completed. TraceId: 20210127180041e230f60b00012894 [2021-01-27T18:01:00,093][INFO ][logstash.outputs.maxtunnel][main] write 1 records on table doc_test_dev.logstash_test_groknginx partition pt='2020-07-08' completed. TraceId: 20210127180044b31f60b0001369e通过MaxCompute客户端或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。
set odps.sql.allow.fullscan=true; select * from logstash_test_groknginx;返回结果如下:
+--------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ | clientip | remote_user | time | verb | uri | version | response | body_bytes | referrer | agent | pt | +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ | 1.1.1.1 | - | 2020-07-09 01:02:03 | GET | /masked/request/uri/1 | 1.1 | 200 | 0 | "-" | "Masked UserAgent" | 2020-02-10 | | 2.2.2.2 | - | 2020-07-09 04:05:06 | GET | /masked/request/uri/2 | 1.1 | 200 | 0 | "-" | "Masked UserAgent 2" | 2020-02-10 | +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ 2 records (at most 10000 supported) fetched by instance tunnel.