本文介绍如何使用Logstash将Kafka数据写入AnalyticDB for MySQL。
操作步骤
- 执行以下命令安装和更新插件。
$bin/plugin install $bin/plugin update
Logstash从1.5版本开始集成Kafka,Logstash 1.5及以上版本中所有插件的目录和命名都发生了改变,插件发布地址为Logstash-plugins。
- 配置插件。
- Input配置示例
以下配置可以实现对Kafka读取端(consumer)的基本使用。
input { kafka { zk_connect => "localhost:2181" group_id => "Logstash" topic_id => "test" codec => plain reset_beginning => false # boolean (optional), default: false consumer_threads => 5 # number (optional), default: 1 decorate_events => true # boolean (optional), default: false } }
参数说明:group_id
:消费者分组,可以通过组ID来指定,不同组之间的消费互不影响,相互隔离。topic_id
:指定消费话题(Topic),也可以理解为先订阅某个话题,然后消费。reset_beginning
:指定Logstash启动后从哪个位置开始读取数据,默认是结束位置,即Logstash进程会从上次读取结束时的偏移量开始继续读取数据;如果之前没有消费过,则从头读取数据。如果您要导入原数据,需将
reset_beginning
值改为true
, Logstash进程将从头开始读取数据,作用类似于cat ,但是Logstash读到最后一行时不会终止,而是变成tail -F
,继续监听相应数据。decorate_events
:指定输出消息时会输出自身信息,包括消费消息的大小、Topic来源以及consumer的group信息。rebalance_max_retries
:当有新的consumer(Logstash)加入到同一个group时,将会Reblance ,此后将会有Partitions的消费端迁移到新的consumer上。如果一个consumer获得了某个Partition的消费权限,那么它将会向Zookeeper注册Partition Owner registry节点信息,但是有可能此时旧的consumer尚没有释放此节点,此值用于控制注册节点的重试次数。consumer_timeout_ms
:在指定时间内没有消息到达将抛出异常,该参数一般无需修改。
更多Input参数配置请参见Input。
说明 如果需要多个Logstash端协同消费同一个Topic,需要先把相应的Topic分多个Partitions(区),此时多个消费者消费将无法保证消息的消费顺序性,然后把两个或多个Logstash消费端配置成相同的group_id
和topic_id
。 - Output配置示例
output { jdbc { driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] } }
参数说明:connection_string
:AnalyticDB for MySQL的连接地址。statement
:INSERT SQL的声明数组。
更多Output参数配置请参见Output。
- Input配置示例
- 在Logstash安装目录中执行
bin/Logstash -f config/xxxx.conf
命令启动任务,将Kafka数据写入AnalyticDB for MySQL。