Upload data to MaxCompute with Logstash (streaming)

更新时间:
复制 MD 格式

The logstash-output-maxcompute plugin lets you stream log data collected by Logstash directly into MaxCompute tables through the MaxCompute Streaming Tunnel (ST). Unlike batch imports, this approach avoids concurrency and small file issues and supports dynamic partitioning—automatically creating partitions based on parsed log field values.

This guide uses NGINX access log collection as an example.

Prerequisites

Before you begin, ensure that you have:

How it works

The logstash-output-maxcompute plugin is an output plugin based on Logstash v7.8.0. It writes data to MaxCompute through the Streaming Tunnel and supports the following data types: STRING, BIGINT, DOUBLE, DATETIME, and BOOLEAN.

DATETIME values are parsed automatically by the ruby Time.parse function. For BOOLEAN fields, any string value equal to "true" (case-insensitive) is parsed as true; all other values are parsed as false.

When to use this plugin:

  • The log format of your source application is supported by a Logstash input plugin or is easy to parse—for example, NGINX access logs.

  • You need to auto-create and populate partitions based on log content.

Step 1: Install the plugin

Option A (recommended): Use a pre-packaged Logstash bundle

Download the Logstash 7.8.0 bundle with the plugin pre-installed and skip to Step 2.

Option B: Install the plugin into an existing Logstash instance

  1. Download the logstash-output-maxcompute 1.1.0 gem and place it in the Logstash root directory (%logstash%).

  2. In %logstash%/Gemfile, replace:

    source "https://rubygems.org"

    with:

    source 'https://gems.ruby-china.com'
  3. From the Logstash root directory, run the install command: On Windows:

    bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gem

    On Linux:

    bin/logstash-plugin install logstash-output-maxcompute-1.1.0.gem

    A successful installation returns:

    Installation successful
  4. (Optional) Verify the installation: On Windows:

    bin\logstash-plugin list maxcompute

    On Linux:

    bin/logstash-plugin list maxcompute

    If the plugin is installed, the output is:

    logstash-output-maxcompute

Step 2: Create a destination table

Run the following SQL statement in the MaxCompute client or any tool that supports MaxCompute SQL. This creates the destination table logstash_test_groknginx, partitioned by date.

create table logstash_test_groknginx(
  clientip string,
  remote_user string,
  time datetime,
  verb string,
  uri string,
  version string,
  response string,
  body_bytes bigint,
  referrer string,
  agent string
) partitioned by (pt string);

Step 3: Create the pipeline configuration file

In the Logstash root directory (%logstash%), create a file named pipeline.conf with the following content:

input { stdin {} }

filter {
  grok {
    match => {
      "message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
    }
  }
  date {
    match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "timestamp"
  }
}

output {
  maxctunnel {
    aliyun_access_id     => "<your_accesskey_id>"
    aliyun_access_key    => "<your_accesskey_secret>"
    aliyun_mc_endpoint   => "<your_project_endpoint>"
    project              => "<your_project_name>"
    table                => "<table_name>"
    partition            => "pt=$<timestamp.strftime('%F')>"
    value_fields         => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
  }
}
In the example above, log input is set to standard input (input { stdin {} }). In a production environment, replace this with the Logstash File input plugin to read NGINX logs from a local file automatically. See the Logstash documentation.

Replace the following placeholders in the output block:

Placeholder Description
<your_accesskey_id> AccessKey ID used to access the target MaxCompute project.
<your_accesskey_secret> AccessKey secret that corresponds to the AccessKey ID.
<your_project_endpoint> Endpoint for the region where the target MaxCompute project is located. See Endpoints.
<your_project_name> Name of the target MaxCompute project.
<table_name> Name of the destination table created in Step 2.

Plugin parameters

The following table lists all parameters for the maxctunnel output block.

Parameter Required Default Description
aliyun_access_id Yes AccessKey ID for the target MaxCompute project.
aliyun_access_key Yes AccessKey secret that corresponds to the AccessKey ID.
aliyun_mc_endpoint Yes Endpoint for the region where the target project is located.
project Yes Name of the target MaxCompute project.
table Yes Name of the destination table.
partition Yes Partition configuration string. See Partition syntax below.
value_fields Yes Log fields mapped to table columns, in the same order as the table definition.
aliyun_mc_tunnel_endpoint No Forces a specific Tunnel Endpoint, overriding automatic routing.
partition_time_format No Source format string for DATETIME string fields referenced in the partition configuration. Needed only when automatic detection fails and you have not used the date filter to convert the field.
retry_time No 3 Number of retries after a failed write.
retry_interval No 1 Minimum seconds between retries.
batch_size No 100 Maximum number of log entries to process per batch.
batch_timeout No 5 Timeout in seconds for write operations.

Partition syntax

The partition parameter maps partition keys to values. Separate multiple partition levels with commas, in the same order as defined in the table schema.

Constant value

Use when the partition value is fixed:

partition => "pt=2024-01-01"

Log field value

Use to set the partition value from a parsed log field:

partition => "pt=$<clientip>"

Datetime field with reformatting

Use to extract and reformat a date/time field. The strftime format string controls the output. %F produces YYYY-MM-DD:

partition => "pt=$<timestamp.strftime('%F')>"

Multiple partition levels

Use when the table has multiple partition columns. List levels in the order they were defined in the table schema:

partition => "date=$<timestamp.strftime('%F')>,hour=$<timestamp.strftime('%H')>"
In the example configuration, the date filter plugin converts httptimestamp to a time-typed field called timestamp, so partition_time_format is not needed. If you skip the date filter and reference httptimestamp directly, set the following instead:
partition_time_format => "%d/%b/%Y:%H:%M:%S %z"
partition             => "pt=$<httptimestamp.strftime('%F')>"

Step 4: Run and test

  1. Start Logstash with the pipeline configuration. On Windows:

    bin\logstash -f pipeline.conf

    On Linux:

    bin/logstash -f pipeline.conf

    Logstash is ready when the following message appears:

    Successfully started Logstash API endpoint
  2. Paste the following sample NGINX log lines into the command-line window and press Enter.

    1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071
    2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072

    A successful write returns a message similar to:

    write .. records on partition .. completed
  3. Query the written data in the MaxCompute client or any tool that supports MaxCompute SQL.

    set odps.sql.allow.fullscan=true;
    select * from logstash_test_groknginx;

    The query returns:

    +------------+-------------+---------------------+------+-----------------------+---------+----------+------------+----------+----------------------+------------+
    | clientip   | remote_user | time                | verb | uri                   | version | response | body_bytes | referrer | agent                | pt         |
    +------------+-------------+---------------------+------+-----------------------+---------+----------+------------+----------+----------------------+------------+
    | 1.1.1.1    | -           | 2020-07-09 01:02:03 | GET  | /masked/request/uri/1 | 1.1     | 200      | 0          | "-"      | "Masked UserAgent"   | 2020-02-10 |
    | 2.2.2.2    | -           | 2020-07-09 04:05:06 | GET  | /masked/request/uri/2 | 1.1     | 200      | 0          | "-"      | "Masked UserAgent 2" | 2020-02-10 |
    +------------+-------------+---------------------+------+-----------------------+---------+----------+------------+----------+----------------------+------------+
    2 records (at most 10000 supported) fetched by instance tunnel.

What's next