通过Logstash导入数仓版

更新时间:

Logstash是一个开源的服务器端数据处理管道,起初用于将日志类数据写入ES中。随着开源社区的不断发展,Logstash可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到您需要的“存储端”。

以日志数据为例,由于AnalyticDB for MySQL支持原生JDBC方式访问,您可以通过开源logstash output插件logstash-output-jdbc将日志数据导入AnalyticDB for MySQL中进行进一步分析。但经过测试发现,在日志量非常大的情况下,通过JDBC方式将数据写入AnalyticDB for MySQL数仓版的性能较低,并且非常消耗CPU的资源(JDBC是单条记录写入)。为此,AnalyticDB for MySQL优化了一个基于JDBCLogstash output plugin插件——logstash-ouput-analyticdb,专门用于以聚合方式向AnalyticDB for MySQL中写入日志数据。

通过logstash-output-analyticdb将数据写入AnalyticDB for MySQL时的性能,相较于logstash-output-jdbc5倍提升,并且对CPU的消耗也明显降低。

安装

Logstash的安装流程请参见Installing Logstash

使用方式

config目录下创建一个logstash-analyticdb.conf(名字可以自定义)配置文件,logstash-analyticdb.conf文件的内容如下所示。

input
{
    stdin { }
}
output {
    analyticdb {
        driver_class => "com.mysql.jdbc.Driver"
        connection_string => "jdbc:mysql://HOSTNAME:PORT/DATABASE?user=USER&password=PASSWORD"
        statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
        commit_size => 4194304
    }
}           
  • connection_string:连接AnalyticDB for MySQLJDBC URL。

  • statement:INSERT SQL的声明数组。

更多参数配置:

  • max_flush_exceptions:当写入数据出现异常时,设置最大重试次数,默认值100。

  • skip_exception:设置是否跳过异常,默认为FALSE,表示出现异常时将重试直到到达最大重试次数max_flush_exceptions,如果仍然失败,则同步程序抛异常终止。设置为TRUE时,如果达到重试次数后仍是失败,则跳过异常,将异常写入日志。

  • flush_size:一次最多攒批数量,和commit_size参数搭配使用。

  • commit_size:一次最多攒批数据量大小,和flush_size参数搭配使用,达到限定值即提交写入任务。

上述配置文件只是一个示例,您需要根据实际业务配置logstash-analyticdb.conf文件。与AnalyticDB for MySQL相关的其他配置请参见README。更多logstash配置和使用规则,请参见logstash文档。

至此,配置任务已全部完成,接下来将启动任务。

启动任务

logstash的安装目录执行bin/logstash -f config/logstash-analyticdb.conf启动任务。

注意事项

建议您通过以下命令将logstash升级至最新版本后,再进行数据写入。

bin/logstash-plugin update logstash-output-analyticdb