全部产品
云市场

使用Logstash将Kafka数据写入AnalyticDB for MySQL

更新时间:2019-11-04 18:19:34

步骤一:安装插件

logstash从1.5版本开始集成Kafka,logstash 1.5及以上版本中所有插件的目录和命名都发生了改变,插件发布地址为logstash-plugins,您可以通过以下命令安装和更新插件。

  1. $bin/plugin install
  2. $bin/plugin update

步骤二:配置插件

Input配置示例

以下配置可以实现对kafka读取端(consumer)的基本使用。

  1. input {
  2. kafka {
  3. zk_connect => "localhost:2181"
  4. group_id => "logstash"
  5. topic_id => "test"
  6. codec => plain
  7. reset_beginning => false # boolean (optional), default: false
  8. consumer_threads => 5 # number (optional), default: 1
  9. decorate_events => true # boolean (optional), default: false
  10. }
  11. }

参数说明:

  • group_id:消费者分组,可以通过组ID来指定,不同组之间的消费互不影响,相互隔离。

  • topic_id:指定消费话题(topic),也可以理解为先订阅某个话题,然后消费。

  • reset_beginning:指定logstash启动后从哪个位置开始读取数据,默认是结束位置,即logstash进程会从上次读取结束时的偏移量开始继续读取数据;如果之前没有消费过,则从头读取数据。

    如果您要导入原有数据,需将reset_beginning值改为true, logstash进程将从头开始读取数据,作用类似于cat ,但是logstash读到最后一行时不会终止,而是变成tail -F,继续监听相应数据。

  • decorate_events:指定输出消息时会输出自身信息,包括消费消息的大小、topic来源以及consumer的group信息。

  • rebalance_max_retries:当有新的consumer(logstash)加入到同一个group时,将会reblance ,此后将会有partitions的消费端迁移到新的consumer上。如果一个consumer获得了某个partition的消费权限,那么它将会向zookeeper注册, Partition Owner registry节点信息,但是有可能此时旧的consumer尚没有释放此节点,此值用于控制,注册节点的重试次数。

  • consumer_timeout_ms:在指定时间内没有消息到达将抛出异常,该参数一般无需修改。

更多Input参数配置请参见Input

注意事项:如果需要多个logstash端协同消费同一个topic,需要先把相应的topic分多个partitions(区),此时多个消费者消费将无法保证消息的消费顺序性,然后把两个或多个logstash消费端配置成相同的group_idtopic_id

Output配置示例

  1. output {
  2. jdbc {
  3. driver_class => "com.mysql.jdbc.Driver"
  4. connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
  5. statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
  6. }
  7. }

参数说明:

  • connection_string:AnalyticDB for MySQL的连接地址。

  • statement:INSERT SQL的声明数组。

更多Output参数配置请参见Output

步骤三:启动插件

在logstash安装目录中执行bin/logstash -f config/xxxx.conf命令启动任务,将Kafka数据写入AnalyticDB for MySQL。