Logstash是开源的服务器端数据处理管道,能够同时从多个数据源采集数据,然后对数据进行转换,并将数据写入指定的存储中。AnalyticDB MySQL完全兼容MySQL,您可以将Logstash Input插件支持的任一数据源中的数据写入AnalyticDB MySQL。本文介绍如何使用Logstash将Kafka数据写入AnalyticDB MySQL数仓版(3.0)

Logstash组件介绍

  • 输入-采集各种样式、大小和来源的数据

    在实际业务中,数据往往以各种各样的形式分散或集中地存储在多个系统中,Logstash支持多种数据输入方式,可以在同一时间从多种数据源采集数据。Logstash能够以连续的流式传输方式轻松地从用户的日志、指标、Web应用、数据存储以及AWS服务采集数据。

  • 过滤-实时解析和转换数据

    数据从源传输到目标存储的过程中,Logstash过滤器能够解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,从而更轻松、快速地分析和实现商业价值。

    • 使用Grok从非结构化数据中派生出结构化数据。
    • 从IP地址破译出地理坐标。
    • 将PII数据匿名化,完全排除敏感字段。
    • 简化整体处理,不受数据源、格式或架构的影响
  • 输出-导出数据

    除了AnalyticDB MySQL以外,Logstash提供多种数据输出方向,灵活解锁众多下游用例。

操作步骤

Kafka是一个高吞吐量的分布式发布、订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特点。目前Kafka已经被各大公司广泛使用,同时logstash也可以快速接入业务中,免去重复建设的麻烦。

  1. 在Apache Kafka服务器根目录,执行以下命令安装和更新插件。
    $ bin/plugin install 
    $ bin/plugin update

    Logstash从1.5版本开始集成Kafka,Logstash 1.5及以上版本中所有插件的目录和命名都发生了改变,插件发布地址为Logstash-plugins

  2. 配置插件。
    • Input配置示例

      以下配置可以实现对Kafka读取端(consumer)的基本使用。

      input {
          kafka {
              zk_connect => "localhost:2181"
              group_id => "Logstash"
              topic_id => "test"
              codec => plain
              reset_beginning => false # boolean (optional), default: false
              consumer_threads => 5  # number (optional), default: 1
              decorate_events => true # boolean (optional), default: false
          }
      }          
      参数说明:
      • group_id:消费者分组,可以通过组ID来指定,不同组之间的消费互不影响,相互隔离。
      • topic_id:指定消费话题(Topic),也可以理解为先订阅某个话题,然后消费。
      • reset_beginning:指定Logstash启动后从哪个位置开始读取数据,默认是结束位置,即Logstash进程会从上次读取结束时的偏移量开始继续读取数据;如果之前没有消费过,则从头读取数据。

        如果您要导入原数据,需将reset_beginning值改为true, Logstash进程将从头开始读取数据,作用类似于cat ,但是Logstash读到最后一行时不会终止,而是变成tail -F,继续监听相应数据。

      • decorate_events:指定输出消息时会输出自身信息,包括消费消息的大小、Topic来源以及consumer的group信息。
      • rebalance_max_retries:当有新的consumer(Logstash)加入到同一个group时,将会Reblance ,此后将会有Partitions的消费端迁移到新的consumer上。如果一个consumer获得了某个Partition的消费权限,那么它将会向Zookeeper注册Partition Owner registry节点信息,但是有可能此时旧的consumer尚没有释放此节点,此值用于控制注册节点的重试次数。
      • consumer_timeout_ms:在指定时间内没有消息到达将抛出异常,该参数一般无需修改。

      更多Input参数配置请参见Input

      说明 如果需要多个Logstash端协同消费同一个Topic,需要先把相应的Topic分多个Partitions(区),此时多个消费者消费将无法保证消息的消费顺序性,然后把两个或多个Logstash消费端配置成相同的group_idtopic_id
    • Output配置示例
      output {
          jdbc {
              driver_class => "com.mysql.jdbc.Driver"
              connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
              statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
          }
      }         
      参数说明:
      • connection_string:AnalyticDB MySQL的连接地址。
      • statement:INSERT SQL的声明数组。

      更多Output参数配置请参见Output

  3. 在Logstash安装目录中执行bin/Logstash -f config/xxxx.conf命令启动任务,将Kafka数据写入AnalyticDB MySQL。