Flume消费

日志服务支持通过aliyun-log-flume插件与Flume进行对接,实现日志数据的写入和消费。

背景信息

aliyun-log-flume是一个实现日志服务与Flume对接的插件,与Flume对接后,日志服务可以通过Flume与其它数据系统如HDFS、Kafka等对接。aliyun-log-flume提供Sink和Source实现日志服务与Flume的对接。

  • Sink:Flume读取其他数据源的数据然后写入日志服务。

  • Source:Flume消费日志服务的日志数据然后写入其他系统。

更多信息,请参见aliyun-log-flume

操作步骤

  1. 下载并安装Flume。

    更多信息,请参见Flume

  2. 下载aliyun-log-flume插件,并将插件存放于***/flume/lib目录下。

    更多信息,请参见aliyun-log-flume-1.3.jar

  3. ***/flume/conf目录下,创建配置文件flumejob.conf。

    • Sink配置及示例请参见Sink

    • Source配置及示例请参见Source

  4. 启动Flume。

Sink

通过Sink将其他数据源的数据通过Flume写入日志服务。目前支持两种解析格式:

  • SIMPLE:将整个Flume Event作为一个字段写入日志服务。

  • DELIMITED:将整个Flume Event作为被分隔符分隔的数据,根据配置的列名解析成对应的字段写入日志服务。

Sink的配置如下:

参数

是否必须

说明

type

默认配置为com.aliyun.Loghub.flume.sink.LoghubSink。

endpoint

Project的服务入口,例如http://cn-qingdao.log.aliyuncs.com。请根据实际情况替换服务入口。更多信息,请参见服务接入点

project

Project名称。

logstore

Logstore名称。

accessKeyId

阿里云AccessKey ID,用于标识用户。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥

accessKey

阿里云AccessKey Secret,用于验证用户的密钥。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥

batchSize

每批次写入日志服务的数据条数。默认为1000条。

maxBufferSize

缓存队列的大小。默认为1000条。

serializer

Event序列化格式。支持的模式如下:

  • DELIMITED:设置解析格式为分隔符模式。

  • SIMPLE:设置解析格式为单行模式。默认为该模式。

  • JSON:设置解析格式为JSON模式。

  • 自定义serializer:设置解析格式为自定义的序列化模式,设置为该模式时需要填写完整列名称。

columns

serializerDELIMITED时,必须指定该字段列表,用半角逗号(,)分隔,顺序与实际数据中的字段顺序一致。

separatorChar

serializerDELIMITED时,用于指定数据的分隔符,必须为单个字符。默认为英文逗号(,)。

quoteChar

serializerDELIMITED时,用于指定引用符。默认为半角双引号(")。

escapeChar

serializerDELIMITED时,用于指定转义字符。默认为半角双引号(")。

useRecordTime

用于设置是否使用数据中的timestamp字段作为日志时间。默认为false表示使用当前时间。

Sink配置示例请参见GitHub

Source

通过Source将日志服务的日志数据通过Flume投递到其他的数据源。目前支持两种输出格式。

  • DELIMITED:数据以分隔符日志的形式写入Flume。

  • JSON:数据以JSON日志的形式写入Flume。

Source配置如下:

参数

是否必须

说明

type

默认配置为com.aliyun.Loghub.flume.source.LoghubSource。

endpoint

Project的服务入口,例如http://cn-qingdao.log.aliyuncs.com。请根据实际情况替换服务入口。更多信息,请参见服务接入点

project

Project名称。

logstore

Logstore名称。

accessKeyId

阿里云AccessKey ID,用于标识用户。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥

accessKey

阿里云AccessKey Secret,用于验证用户的密钥。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥

heartbeatIntervalMs

客户端和日志服务的心跳间隔,默认为30000毫秒。

fetchIntervalMs

数据拉取间隔,默认为100毫秒。

fetchInOrder

是否按顺序消费。默认为false。

batchSize

每批次读取的数据条数,默认为100条。

consumerGroup

读取的消费组名称。

initialPosition

读取数据的起点位置,支持beginendtimestamp。默认为begin

说明

如果服务端已经存在Checkpoint,会优先使用服务端的Checkpoint。

timestamp

initialPositiontimestamp时,必须指定时间戳,为Unix时间戳格式。

deserializer

Event反序列化格式,支持的模式如下:

  • DELIMITED:设置解析格式为分隔符模式。默认为该模式。

  • JSON:设置解析格式为JSON模式。

  • 自定义deserializer:设置解析格式为自定义的反序列化模式,设置为该模式时需要填写完整列名称。

columns

deserializerDELIMITED时,必须指定字段列表,用半角逗号(,)分隔,顺序与实际数据中的字段顺序一致。

separatorChar

deserializerDELIMITED时,用于指定数据的分隔符,必须为单个字符。默认为英文逗号(,)。

quoteChar

deserializerDELIMITED时,用于指定引用符。默认为半角双引号(")。

escapeChar

deserializerDELIMITED时,用于指定转义字符。默认为半角双引号(")。

appendTimestamp

deserializerDELIMITED时,用于设置是否将时间戳作为一个字段自动添加到每行末尾。默认为false。

sourceAsField

deserializerJSON时,用于设置是否将日志Source作为一个字段,字段名称为__source__。默认为false。

tagAsField

deserializerJSON时,用于设置是否将日志Tag作为字段,字段名称为__tag__:{tag名称}。默认为false。

timeAsField

deserializerJSON时,用于设置是否将日志时间作为一个字段,字段名称为__time__。默认为false。

useRecordTime

用于设置是否使用日志的时间,如果为false则使用当前时间。默认为false。

Source配置示例请参见GitHub