logstash-input-oss插件基于阿里云消息服务(MNS),实现了当关联的OSS文件变化时,触发变更事件,MNS会通知Logstash从OSS文件系统拉取最新数据。您可以在OSS事件页面,完成当文件发生变化时,自动发送消息给MNS的配置。

说明 logstash-input-oss是阿里云维护的开源插件,详情请参见logstash-input-oss

安装logstash-input-oss插件

  1. 进入Logstash实例管理页面
  2. 单击左侧导航栏的插件配置
  3. 系统默认插件列表中,单击logstash-input-oss右侧的安装
    警告 安装logstash-input-oss插件会触发集群重启,请确认后再执行以下步骤。
  4. 操作提示对话框中,认真阅读系统提示,单击确认安装logstash-input-oss插件

    确认后,Logstash实例会进行重启,重启过程中可在任务列表中查看任务进度详情。重启成功后,即可完成logstash-input-oss插件的安装。

    安装成功后,如果不再使用,可单击logstash-input-oss插件右侧的卸载,使用同样的方式进行卸载。
    注意 卸载插件也会触发集群重启,请确认后操作。

使用方式

前提条件

当您将OSS事件通知配置完成后,可以登录Kibnan控制台,创建Pipeline任务。在创建Pipeline任务时,按照以下说明配置Pipeline参数,即可触发Logstash从OSS拉取数据。

以从OSS拉取数据写入到阿里云ES为例。
input {
  oss {
    "endpoint" => "http://oss-cn-region.aliyuncs.com"
    "bucket" => "aliyun-es-sample"
    "access_key_id" => "xxxxxxx"
    "access_key_secret" => "xxxxxxxxxxx"
    "prefix" => "file-sample-prefix"
    "mns_settings" => {
      "endpoint" => "xxxxxx.mns.cn-hangzhou.aliyuncs.com"
      "queue" => "aliyun-es-sample-mns"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-xxx.elasticsearch.aliyuncs.com:9200"]
    index => "aliyun-es-sample"
    user => "elastic"
    password => "changeme"
  }
}

参数说明

logstash-input-oss插件支持的参数如下。
参数 类型 是否必选 说明
endpoint string OSS对外服务的访问域名,详情请参见访问域名和数据中心
bucket string OSS的Bucket名称。
access_key_id string 阿里云账号的AccessKey ID。
access_key_secret string 阿里云账号的Access Key Secret。
prefix string 如果指定了该参数,则Bucket中文件名的前缀必须与之匹配(不是正则表达式)。
additional_oss_settings hash 附加的OSS客户端配置。可选值:secure_connection_enabledmax_connections_to_oss
delete boolean 是否从原始Bucket中删除已处理的文件。
backup_to_bucket string 用来备份已处理过的文件的OSS Bucket名称。
backup_to_dir string 用来备份已经处理过的文件的本地目录路径。
backup_add_prefix string 文件处理后,为key(OSS中包含文件名的完整路径)附加一个前缀。当您将数据备份到另一个(或同一个)Bucket时,这个参数将有效地让您选择一个新的文件夹来放置文件。
include_object_properties boolean 是否在[@metadata][oss]中包含OSS对象的属性(last_modified,content_type,metadata)。如果不设置此参数, [@metadata][oss][key]将始终存在。
exclude_pattern string 要从Bucket中排除的key的ruby正则表达式。
mns_settings hash 消息服务(MNS)配置。
可选值及说明如下:
  • endpoint:MNS端口链接。
  • queue:队列名。
  • poll_interval_seconds:当队列中没有消息时,针对该队列的ReceiveMessage请求最长的等待时间,默认为10秒。
  • wait_seconds:本次ReceiveMessage请求最长的Polling等待时间,单位为秒。

ReceiveMessage的详细信息请参见ReceiveMessage

注意事项

  • logstash-input-oss插件接收到MNS通知消息后,会全量拉取您关联的文件。
  • 如果OSS存储的是.gz或.gzip结尾的文本文件,Logstash会以.gzip的文件格式对其进行处理,其它格式的文件以文本文件进行处理。
  • 文件是以文本文件的方式读取,如果您的文件是不可解析的格式,有可能读取出来是乱码。例如,当您的文件是jar、bin等格式时,读取出来会是乱码。

常见问题

  • Q:为什么基于MNS设计logstash-input-oss插件?

    A:因为OSS文件的变更需要有一种机制通知客户端,目前OSS文件事件变更可以无缝的写入到MNS中。

  • Q:为什么不使用OSS ListObjects API去获取变更的文件?

    A:OSS在记录未处理的文件及已经处理的文件时会增加本地存储,当本地存储较大时,ListObjects API性能会降低。目前其他文件存储系统,如S3开源社区,也将ListObjects API改为了消息通知机制。