The logstash-output-maxcompute plugin lets you stream log data collected by Logstash directly into MaxCompute tables through the MaxCompute Streaming Tunnel (ST). Unlike batch imports, this approach avoids concurrency and small file issues and supports dynamic partitioning—automatically creating partitions based on parsed log field values.
This guide uses NGINX access log collection as an example.
Prerequisites
Before you begin, ensure that you have:
-
Installed Logstash 7.8.0 and set up a Logstash instance for log collection. See Getting started with Logstash.
How it works
The logstash-output-maxcompute plugin is an output plugin based on Logstash v7.8.0. It writes data to MaxCompute through the Streaming Tunnel and supports the following data types: STRING, BIGINT, DOUBLE, DATETIME, and BOOLEAN.
DATETIME values are parsed automatically by theruby Time.parsefunction. For BOOLEAN fields, any string value equal to"true"(case-insensitive) is parsed astrue; all other values are parsed asfalse.
When to use this plugin:
-
The log format of your source application is supported by a Logstash input plugin or is easy to parse—for example, NGINX access logs.
-
You need to auto-create and populate partitions based on log content.
Step 1: Install the plugin
Option A (recommended): Use a pre-packaged Logstash bundle
Download the Logstash 7.8.0 bundle with the plugin pre-installed and skip to Step 2.
Option B: Install the plugin into an existing Logstash instance
-
Download the logstash-output-maxcompute 1.1.0 gem and place it in the Logstash root directory (
%logstash%). -
In
%logstash%/Gemfile, replace:source "https://rubygems.org"with:
source 'https://gems.ruby-china.com' -
From the Logstash root directory, run the install command: On Windows:
bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gemOn Linux:
bin/logstash-plugin install logstash-output-maxcompute-1.1.0.gemA successful installation returns:
Installation successful -
(Optional) Verify the installation: On Windows:
bin\logstash-plugin list maxcomputeOn Linux:
bin/logstash-plugin list maxcomputeIf the plugin is installed, the output is:
logstash-output-maxcompute
Step 2: Create a destination table
Run the following SQL statement in the MaxCompute client or any tool that supports MaxCompute SQL. This creates the destination table logstash_test_groknginx, partitioned by date.
create table logstash_test_groknginx(
clientip string,
remote_user string,
time datetime,
verb string,
uri string,
version string,
response string,
body_bytes bigint,
referrer string,
agent string
) partitioned by (pt string);
Step 3: Create the pipeline configuration file
In the Logstash root directory (%logstash%), create a file named pipeline.conf with the following content:
input { stdin {} }
filter {
grok {
match => {
"message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
}
}
date {
match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "timestamp"
}
}
output {
maxctunnel {
aliyun_access_id => "<your_accesskey_id>"
aliyun_access_key => "<your_accesskey_secret>"
aliyun_mc_endpoint => "<your_project_endpoint>"
project => "<your_project_name>"
table => "<table_name>"
partition => "pt=$<timestamp.strftime('%F')>"
value_fields => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
}
}
In the example above, log input is set to standard input (input { stdin {} }). In a production environment, replace this with the Logstash File input plugin to read NGINX logs from a local file automatically. See the Logstash documentation.
Replace the following placeholders in the output block:
| Placeholder | Description |
|---|---|
<your_accesskey_id> |
AccessKey ID used to access the target MaxCompute project. |
<your_accesskey_secret> |
AccessKey secret that corresponds to the AccessKey ID. |
<your_project_endpoint> |
Endpoint for the region where the target MaxCompute project is located. See Endpoints. |
<your_project_name> |
Name of the target MaxCompute project. |
<table_name> |
Name of the destination table created in Step 2. |
Plugin parameters
The following table lists all parameters for the maxctunnel output block.
| Parameter | Required | Default | Description |
|---|---|---|---|
aliyun_access_id |
Yes | — | AccessKey ID for the target MaxCompute project. |
aliyun_access_key |
Yes | — | AccessKey secret that corresponds to the AccessKey ID. |
aliyun_mc_endpoint |
Yes | — | Endpoint for the region where the target project is located. |
project |
Yes | — | Name of the target MaxCompute project. |
table |
Yes | — | Name of the destination table. |
partition |
Yes | — | Partition configuration string. See Partition syntax below. |
value_fields |
Yes | — | Log fields mapped to table columns, in the same order as the table definition. |
aliyun_mc_tunnel_endpoint |
No | — | Forces a specific Tunnel Endpoint, overriding automatic routing. |
partition_time_format |
No | — | Source format string for DATETIME string fields referenced in the partition configuration. Needed only when automatic detection fails and you have not used the date filter to convert the field. |
retry_time |
No | 3 | Number of retries after a failed write. |
retry_interval |
No | 1 | Minimum seconds between retries. |
batch_size |
No | 100 | Maximum number of log entries to process per batch. |
batch_timeout |
No | 5 | Timeout in seconds for write operations. |
Partition syntax
The partition parameter maps partition keys to values. Separate multiple partition levels with commas, in the same order as defined in the table schema.
Constant value
Use when the partition value is fixed:
partition => "pt=2024-01-01"
Log field value
Use to set the partition value from a parsed log field:
partition => "pt=$<clientip>"
Datetime field with reformatting
Use to extract and reformat a date/time field. The strftime format string controls the output. %F produces YYYY-MM-DD:
partition => "pt=$<timestamp.strftime('%F')>"
Multiple partition levels
Use when the table has multiple partition columns. List levels in the order they were defined in the table schema:
partition => "date=$<timestamp.strftime('%F')>,hour=$<timestamp.strftime('%H')>"
In the example configuration, thedatefilter plugin convertshttptimestampto a time-typed field calledtimestamp, sopartition_time_formatis not needed. If you skip thedatefilter and referencehttptimestampdirectly, set the following instead:
partition_time_format => "%d/%b/%Y:%H:%M:%S %z"
partition => "pt=$<httptimestamp.strftime('%F')>"
Step 4: Run and test
-
Start Logstash with the pipeline configuration. On Windows:
bin\logstash -f pipeline.confOn Linux:
bin/logstash -f pipeline.confLogstash is ready when the following message appears:
Successfully started Logstash API endpoint -
Paste the following sample NGINX log lines into the command-line window and press Enter.
1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071 2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072A successful write returns a message similar to:
write .. records on partition .. completed -
Query the written data in the MaxCompute client or any tool that supports MaxCompute SQL.
set odps.sql.allow.fullscan=true; select * from logstash_test_groknginx;The query returns:
+------------+-------------+---------------------+------+-----------------------+---------+----------+------------+----------+----------------------+------------+ | clientip | remote_user | time | verb | uri | version | response | body_bytes | referrer | agent | pt | +------------+-------------+---------------------+------+-----------------------+---------+----------+------------+----------+----------------------+------------+ | 1.1.1.1 | - | 2020-07-09 01:02:03 | GET | /masked/request/uri/1 | 1.1 | 200 | 0 | "-" | "Masked UserAgent" | 2020-02-10 | | 2.2.2.2 | - | 2020-07-09 04:05:06 | GET | /masked/request/uri/2 | 1.1 | 200 | 0 | "-" | "Masked UserAgent 2" | 2020-02-10 | +------------+-------------+---------------------+------+-----------------------+---------+----------+------------+----------+----------------------+------------+ 2 records (at most 10000 supported) fetched by instance tunnel.
What's next
-
To read NGINX logs from a file automatically, configure the Logstash File input plugin.
-
To learn more about the Logstash pipeline, see the Logstash documentation.
-
To learn more about the MaxCompute Streaming Tunnel, see Streaming Tunnel.