日志服务支持通过aliyun-log-flume插件与Flume进行对接,实现日志数据的写入和消费。

背景信息

aliyun-log-flume是一个实现日志服务与Flume对接的插件,与Flume对接后,日志服务可以通过Flume与其它数据系统如HDFS、Kafka等对接。aliyun-log-flume提供Sink和Source实现日志服务与Flume的对接。
  • Sink:Flume读取其他数据源的数据然后写入日志服务。
  • Source:Flume消费日志服务的日志数据然后写入其他系统。
更多信息,请参见aliyun-log-flume

操作步骤

  1. 下载并安装Flume。
    更多信息,请参见Flume
  2. 下载aliyun-log-flume插件,并将插件存放于cd/***/flume/lib目录下。
    更多信息,请参见aliyun-log-flume-1.3.jar
  3. cd/***/flume/conf目录下,创建配置文件flumejob.conf。
    • Sink配置及示例请参见Sink
    • Source配置及示例请参见Source
  4. 启动Flume。

Sink

通过Sink将其他数据源的数据通过Flume写入日志服务。目前支持两种解析格式:
  • SIMPLE:将整个Flume Event作为一个字段写入日志服务。
  • DELIMITED:将整个Flume Event作为被分隔符分隔的数据,根据配置的列名解析成对应的字段写入日志服务。
Sink的配置如下:
参数 是否必须 说明
type 默认配置为com.aliyun.Loghub.flume.sink.LoghubSink。
endpoint Project的服务入口,例如http://cn-qingdao.log.aliyuncs.com。请根据实际情况替换服务入口。更多信息,请参见服务入口
project Project名称。
logstore Logstore名称。
accessKeyId 阿里云AccessKey ID,用于标识用户。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥
accessKey 阿里云AccessKey Secret,用于验证用户的密钥。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥
batchSize 每批次写入日志服务的数据条数。默认为1000条。
maxBufferSize 缓存队列的大小。默认为1000条。
serializer Event序列化格式。支持的模式如下:
  • DELIMITED:设置解析格式为分隔符模式。
  • SIMPLE:设置解析格式为单行模式。默认为该模式。
  • 自定义serializer:设置解析格式为自定义的序列化模式,设置为该模式时需要填写完整列名称。
columns serializerDELIMITED时,必须指定该字段列表,用半角逗号(,)分隔,顺序与实际数据中的字段顺序一致。
separatorChar serializerDELIMITED时,用于指定数据的分隔符,必须为单个字符。默认为英文逗号(,)。
quoteChar serializerDELIMITED时,用于指定引用符。默认为半角双引号(")。
escapeChar serializerDELIMITED时,用于指定转义字符。默认为半角双引号(")。
useRecordTime 用于设置是否使用数据中的timestamp字段作为日志时间。默认为false表示使用当前时间。
Sink配置示例请参见GitHub

Source

通过Source将日志服务的日志数据通过Flume投递到其他的数据源。目前支持两种输出格式。
  • DELIMITED:数据以分隔符日志的形式写入Flume。
  • JSON:数据以JSON日志的形式写入Flume。
Source配置如下:
参数 是否必须 说明
type 默认配置为com.aliyun.Loghub.flume.source.LoghubSource。
endpoint Project的服务入口,例如http://cn-qingdao.log.aliyuncs.com。请根据实际情况替换服务入口。更多信息,请参见服务入口
project Project名称。
logstore Logstore名称。
accessKeyId 阿里云AccessKey ID,用于标识用户。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥
accessKey 阿里云AccessKey Secret,用于验证用户的密钥。为保证账号安全,建议您使用RAM用户的AccessKey。如何获取AccessKey,请参见访问密钥
heartbeatIntervalMs 客户端和日志服务的心跳间隔,默认为30000毫秒。
fetchIntervalMs 数据拉取间隔,默认为100毫秒。
fetchInOrder 是否按顺序消费。默认为false。
batchSize 每批次读取的数据条数,默认为100条。
consumerGroup 读取的消费组名称。
initialPosition 读取数据的起点位置,支持beginendtimestamp。默认为begin
说明 如果服务端已经存在Checkpoint,会优先使用服务端的Checkpoint。
timestamp initialPositiontimestamp时,必须指定时间戳,为Unix时间戳格式。
deserializer Event反序列化格式,支持的模式如下:
  • DELIMITED:设置解析格式为分隔符模式。默认为该模式。
  • JSON:设置解析格式为JSON模式。
  • 自定义deserializer:设置解析格式为自定义的反序列化模式,设置为该模式时需要填写完整列名称。
columns deserializerDELIMITED时,必须指定字段列表,用半角逗号(,)分隔,顺序与实际数据中的字段顺序一致。
separatorChar deserializerDELIMITED时,用于指定数据的分隔符,必须为单个字符。默认为英文逗号(,)。
quoteChar deserializerDELIMITED时,用于指定引用符。默认为半角双引号(")。
escapeChar deserializerDELIMITED时,用于指定转义字符。默认为半角双引号(")。
appendTimestamp deserializerDELIMITED时,用于设置是否将时间戳作为一个字段自动添加到每行末尾。默认为false。
sourceAsField deserializerJSON时,用于设置是否将日志Source作为一个字段,字段名称为__source__。默认为false。
tagAsField deserializerJSON时,用于设置是否将日志Tag作为字段,字段名称为__tag__:{tag名称}。默认为false。
timeAsField deserializerJSON时,用于设置是否将日志时间作为一个字段,字段名称为__time__。默认为false。
useRecordTime 用于设置是否使用日志的时间,如果为false则使用当前时间。默认为false。
Source配置示例请参见GitHub