通过logstash-input-datahub插件,您可以读取DataHub中的数据到其他数据源中。本文介绍如何使用logstash-input-datahub插件。

前提条件

您已完成以下操作:

  • 安装logstash-input-datahub插件。

    具体操作,请参见安装或卸载插件

  • 开通DataHub产品,并完成创建项目、创建Topic和导入数据。

    具体操作,请参见快速入门

使用logstash-input-datahub插件

参见通过配置文件管理管道,在创建管道任务时,按照以下说明配置Pipeline参数,保存并部署后,即可触发阿里云Logstash读取DataHub的数据到目标数据库中。

Logstash的Pipeline配置如下,相关参数说明请参见参数说明

input {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "test_project"
        topic_name => "test_topic"
        interval => 5
        #cursor => {
        #    "0"=>"20000000000000000000000003110091"
        #    "2"=>"20000000000000000000000003110091"
        #    "1"=>"20000000000000000000000003110091"
        #    "4"=>"20000000000000000000000003110091"
        #    "3"=>"20000000000000000000000003110000"
        #}
        shard_ids => []
        pos_file => "/ssd/1/<Logstash实例ID>/logstash/data/文件名"
    }
}
output {
    elasticsearch {
      hosts => ["http://es-cn-mp91cbxsm000c****.elasticsearch.aliyuncs.com:9200"]
      user => "elastic"
      password => "your_password"
      index => "datahubtest"
      document_type => "_doc"
  }
}
注意 目前阿里云Logstash只支持同一专有网络下的数据传输,如果源端数据在公网环境下,请参见配置NAT公网数据传输,通过公网访问Logstash。

参数说明

logstash-input-datahub插件支持的参数如下。
参数 类型 是否必选 说明
endpoint string DataHub对外服务的访问域名,详细信息请参见域名列表
access_id string 阿里云账号的AccessKey ID。
access_key string 阿里云账号的Access Key Secret。
project_name string DataHub的项目名称。
topic_name string DataHub的Topic名称。
retry_times number 重试次数。-1表示无限重试(默认)、0表示不重试、大于0表示按照设置的次数重试。
retry_interval number 重试的间隔,单位为秒。
shard_ids array 需要消费的shard列表。默认为空,表示全部消费。
cursor string 消费起点。默认为空,表示从头开始消费。
pos_file string Checkpoint记录文件,必须配置,优先使用checkpoint恢复消费。
enable_pb boolean 是否使用pb传输,默认为true。如果不支持pb传输,请将该参数设置为false。
compress_method string 网络传输的压缩算法,默认不压缩。可选项:lz4deflate
print_debug_info boolean 是否打印DataHub的Debug信息,默认为false。设置为true时,会打印大量信息,这些信息仅用来进行脚本调试。