通过logstash-output-datahub插件,您可以将数据传输到DataHub中。

前提条件

您已完成以下操作:

  • 安装logstash-output-datahub插件。

    详情请参见安装Logstash插件

  • 开通DataHub产品,并完成创建项目和创建Topic。

    详情请参见DataHub官方文档的用户指南章节。

  • 准备输入数据源。

    输入数据源可以为input支持的所有输入源插件中的数据,详情请参见input插件

使用logstash-output-datahub插件

满足以上前提条件后,您可以通过配置文件管理管道的方式创建管道任务。在创建管道任务时,按照以下说明配置Pipeline参数,保存并部署后,即可触发阿里云Logstash向DataHub传送数据。

以将CSV文件中的数据传送到DataHub为例。CSV文件示例如下。

1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1

与以上CSV文件对应的DataHub的Topic格式如下。

字段名称 字段类型
col1 BIGINT
col2 DOUBLE
col3 BOOLEAN
col4 TIMESTAMP
col5 STRING

Logstash的Pipeline配置如下,相关参数说明请参见参数说明

input {
    file {
        path => "${APP_HOME}/data.csv"
        start_position => "beginning"
    }
}
filter{
    csv {
        columns => ['col1', 'col2', 'col3', 'col4', 'col5']
    }
}
output {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Log4j日志数据和JSON文件的传输示例请参见LogStash插件

说明 阿里云Logstash目前只支持在同一专有网络VPC(Virtual Private Cloud)下进行数据传输,如果源端数据在公网下,请参见配置NAT公网数据传输,在公网环境下进行数据传输。

参数说明

logstash-output-datahub插件支持的参数如下。
参数 类型 是否必选 说明
endpoint string DataHub对外服务的访问域名,详情请参见DataHub域名列表
access_id string 阿里云账号的AccessKey ID。
access_key string 阿里云账号的Access Key Secret。
project_name string DataHub的项目名称。
topic_name string DataHub的Topic名称。
retry_times number 重试次数。-1表示无限重试(默认)、0表示不重试、大于0表示按照设置的次数重试。
retry_interval number 重试的间隔,单位为秒,默认为5。
skip_after_retry boolean 当由DataHub异常导致的重试次数超过retry_times设置的值,是否跳过这一轮上传的数据。默认为false。
approximate_request_bytes number 用于限制每次发送请求的字节数,是一个近似值,防止因Request body过大而被拒绝接收,默认为2048576(2MB)。
shard_keys array 数据的字段名称,插件会根据这些字段的值计算Hash值,将每条数据写入到某个shard。
注意 shard_keysshard_ids都未指定,默认轮询写入各shard。
shard_ids array 所有数据写入指定的shard。
注意 shard_keysshard_ids都未指定,默认轮询写入各shard。
dirty_data_continue string 处理数据时遇到脏数据是否继续运行,默认为false。设置为true时,必须指定dirty_data_file文件,表示处理数据时忽略脏数据。
dirty_data_file string 脏数据文件名称。当dirty_data_continue为true时,必须指定该参数值。
注意 处理数据时,脏数据文件会被分割成两个部分part1和part2,part1为原脏数据,part2为替换后的脏数据。
dirty_data_file_max_size number 脏数据文件大小的最大值。
enable_pb boolean 是否使用pb传输,默认为true。如果不支持pb传输,请将该参数值设置为false。