基于Logstash迁移OSS数据

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

当对象存储服务OSS(Object Storage Service)文件发生变更,触发阿里云轻量消息队列(简称SMQ,原MNS)事件通知时,您可以通过阿里云Logstash的logstash-input-oss插件获取OSS变更事件,再通过logstash-output-oss插件将数据同步到OSS Bucket中。本文介绍对应的配置方法。

前提条件

您已完成以下操作:

  • 安装logstash-input-oss和logstash-output-oss插件。

    具体操作请参见安装或卸载插件

  • 开通OSS服务并准备数据。

    具体操作请参见开通OSS服务

  • 开通轻量消息队列SMQ,并确保与OSS服务在同一区域。

    具体操作请参见开通轻量消息队列(原 MNS)并授权

    说明

    本示例设置事件通知类型为PostObject、PutObject,即触发Post、Put事件请求,logstash-output-oss插件同步此数据到对端OSS。

操作步骤

  1. 步骤一:配置事件通知规则

  2. 步骤二:配置Logstash管道

  3. 步骤三:查看数据同步结果

步骤一:配置事件通知规则

  1. 登录OSS管理控制台

  2. 在左侧导航栏,单击Bucket列表,再单击目标Bucket名称

  3. 选择数据处理 > 事件通知

  4. 事件通知区域,单击创建规则

  5. 创建规则面板,配置事件通知规则。

    本示例配置的事件通知如下。事件通知配置

    配置参数详情请参见设置事件通知规则

    说明

    资源描述目录中不能包含特殊字符,更多事件类型请参见通过事件通知实时处理OSS文件变动

  6. 单击确定

  7. 可选:查看事件通知规则。

    1. 进入轻量消息队列SMQ控制台

    2. 在顶部菜单栏选择地域。

    3. 在左侧导航栏,单击事件通知

步骤二:配置Logstash管道

  1. 进入阿里云Elasticsearch控制台的Logstash页面
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理

  4. 单击创建管道

  5. 创建管道任务页面,输入管道ID并配置管道。

    本文使用的管道配置如下。

    input {
      oss {
        endpoint => "oss-cn-hangzhou-internal.aliyuncs.com"
        bucket => "zl-ossoutoss"
        access_key_id => "LTAIaX42ddd******"
        access_key_secret => "zuyBRUUndddddds3e6i******"
        mns_settings => {
           endpoint => "18185036364****.mns.cn-hangzhou-internal.aliyuncs.com"
           queue => "zl-test"
        }
        codec => json {
          charset => "UTF-8"
        }
      }
    }
    
    output {
      oss {
        endpoint => "http://oss-cn-hangzhou-internal.aliyuncs.com"              
        bucket => "zl-log-output-test"                          
        access_key_id => "LTAIaxxxxx******"                 
        access_key_secret => "zuxxxx8hBpXs3e6i******"         
        prefix => "oss/database"                         
        recover => true                                      
        rotation_strategy => "size_and_time"                  
        time_rotate => 1                                     
        size_rotate => 1000
        temporary_directory => "/ssd/1/ls-cn-0pp1cwec****/logstash/data/22"                            
        encoding => "gzip"                                 
        additional_oss_settings => {
          max_connections_to_oss => 1024                      
          secure_connection_enabled => false                  
        }
    
      }
    }
    重要
  6. 单击下一步,配置管道参数。

    管道参数配置

    参数

    说明

    管道工作线程

    并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。

    管道批大小

    单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。

    管道批延迟

    创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。

    队列类型

    用于事件缓冲的内部排队模型。可选值:

    • MEMORY:默认值。基于内存的传统队列。

    • PERSISTED:基于磁盘的ACKed队列(持久队列)。

    队列最大字节数

    请确保该值小于您的磁盘总容量。默认值:1024 MB。

    队列检查点写入数

    启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。

    警告

    配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。

  7. 单击保存或者保存并部署

    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。

    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

步骤三:查看数据同步结果

  1. 进入OSS管理控制台

  2. 上传新的文件到SMQ监控的目录中(步骤一:配置事件通知规则中定义的资源描述)。

    上传文件的具体操作请参见控制台上传文件

  3. 进入目标端OSS Bucket,查看同步成功的数据。

    重要

    OSS插件不是以文档或目录形式进行数据同步,而是按照原文档中的数据一条一条进行读写。根据rotate规则,logstash-output-oss插件先将数据存储在本地临时文件中,当达到一定的时间或者大小再进行上传,所以不能保证同一个文档的数据保存到相同的文档下,例如可能会出现多个文档的部分数据同步到一个文档中的情况。