Learn how to use the Log Service (SLS) connector.
Background
Simple Log Service is an end-to-end service for log data. It helps you collect, consume, ship, query, and analyze log data efficiently. This improves O&M efficiency and enables you to process massive amounts of log data.
The following table lists the capabilities of the SLS connector.
|
Category |
Description |
|
Supported Types |
Source table and sink table |
|
running mode |
Streaming mode only |
|
Connector-specific metrics |
N/A |
|
Data format |
N/A |
|
API type |
SQL, DataStream API, and data ingestion YAML API |
|
Data update or deletion in the sink table |
Sink tables are append-only; you cannot update or delete data. |
Features
The SLS source connector directly reads message attribute fields. The following table lists the supported fields.
|
Parameter |
Type |
Description |
|
__source__ |
STRING METADATA VIRTUAL |
Message source. |
|
__topic__ |
STRING METADATA VIRTUAL |
Message topic. |
|
__timestamp__ |
BIGINT METADATA VIRTUAL |
Log time. |
|
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL |
Message tag. For example, for the attribute |
Prerequisites
Ensure you have created a Log Service Project and a Logstore. For more information, see Create a Project and a Logstore.
Limitations
-
Only Ververica Runtime (VVR) 11.1 and later supports using SLS as a synchronous source for data ingestion defined in YAML.
-
The SLS connector guarantees only at-least-once semantics.
-
Do not set the
source parallelismhigher than the number of shards, as this wastes resources. Additionally, for Ververica Runtime (VVR) 8.0.5 and earlier, a change in the number of shards can cause the automaticfailoverfeature to fail, preventing some shards from being consumed.
SQL
Syntax
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
With options
-
General
Parameter
Description
Type
Required
Default
Remarks
connector
The connector to use.
String
Yes
None
Set this to
sls.endPoint
The endpoint of Log Service (SLS).
String
Yes
None
Specify the VPC access address of Log Service (SLS). For more information, see Service endpoints.
Note-
By default, Realtime Compute for Apache Flink cannot access the internet. To enable internet access from your Virtual Private Cloud (VPC), use a NAT Gateway. For more information, see How do I access the Internet?.
-
We recommend that you do not access SLS over the internet. If you must do so, use HTTPS and enable transfer acceleration. For more information, see Manage transfer acceleration.
project
The name of the SLS project.
String
Yes
None
N/A
logStore
The name of the Logstore or MetricStore.
String
Yes
None
Data in a Logstore is consumed in the same way as data in a MetricStore.
accessId
The AccessKey ID of your Alibaba Cloud account.
String
Yes
None
For more information, see Obtain an AccessKey pair.
ImportantTo prevent your AccessKey pair from being exposed, we recommend that you use variables to specify the AccessKey ID and AccessKey secret. For more information, see Project variables.
accessKey
The AccessKey secret of your Alibaba Cloud account.
String
Yes
None
-
-
Source-specific
Parameter
Description
Type
Required
Default
Remarks
enableNewSource
Specifies whether to use the new data source that implements the FLIP-27 interface.
Boolean
No
false
The new source can automatically adapt to shard changes and distribute shards as evenly as possible across all source subtasks.
Important-
This option is supported only in VVR 8.0.9 and later. The default value is
truefor VVR 11.1 and later. -
If you change this option's value, you cannot restore jobs from a saved state. To resume consumption from a historical offset, first start the job with the
consumerGroupoption to record the consumption progress in the SLS consumer group. Then, set theconsumeFromCheckpointoption totrueand restart the job without a state. -
If a Logstore has read-only shards, some subtasks might continue requesting data from other shards after finishing their own. This can cause an imbalanced workload, affecting performance. To mitigate this issue, you can adjust the parallelism, optimize the scheduling strategy, or merge small shards to reduce the number of shards and simplify task allocation.
shardDiscoveryIntervalMs
The interval for dynamic shard discovery.
Long
No
60000
Set this option to a negative value to disable dynamic discovery. Unit: milliseconds.
Note-
The value must be greater than or equal to 60,000 milliseconds (1 minute).
-
This option takes effect only when
enableNewSourceis set totrue. -
This option is supported only in VVR 8.0.9 and later.
startupMode
The startup mode for the source table.
String
No
timestamp
-
timestamp(default): Consume logs starting from the specified start time. -
latest: Start consuming logs from the latest offset. -
earliest: Starts consuming logs from the earliest offset. -
consumer_group: Starts log consumption from the offset recorded by the consumer group. If the consumer group has not recorded a consumption offset for a shard, consumption starts from the earliest offset.
Important-
For VVR versions earlier than 11.1, the consumer_group value is not supported. You must set
consumeFromCheckpointtotrue. In this case, log consumption starts from the offset recorded by the specified consumer group, and the startup mode setting will not take effect.
startTime
The start time for log consumption.
String
No
Current time
The format is
yyyy-MM-dd hh:mm:ss.This takes effect only when
startupModeis set totimestamp.NoteThe
startTimeandstopTimeoptions are based on the__receive_time__attribute in SLS, rather than the__timestamp__attribute.stopTime
The end time for log consumption.
String
No
None
The format is
yyyy-MM-dd hh:mm:ss.Note-
This option is used only for consuming historical logs and must be set to a time in the past. If you set it to a future time, consumption may stop prematurely if no new logs are written, resulting in a data flow interruption without any error messages.
-
If you want the Flink job to exit after all logs are consumed, you must also set
exitAfterFinishtotrue.
consumerGroup
The name of the consumer group.
String
No
None
A consumer group is used to record consumption progress. You can specify a custom name without a fixed format.
NoteDifferent Flink jobs must use different consumer groups. If multiple Flink jobs use the same consumer group, they do not coordinate and each job consumes all the data. This is because Flink does not use the SLS consumer group for partition assignment when it consumes data from SLS. As a result, each consumer consumes messages independently, even if they share the same consumer group.
consumeFromCheckpoint
Specifies whether to consume from a consumer group checkpoint.
String
No
false
-
true: You must also specify a consumer group. The Flink program starts consuming logs from the checkpoint saved in the consumer group. If the consumer group does not have a corresponding checkpoint, consumption starts from the startTime configuration value. -
false(default value): Does not start consuming logs from the checkpoint saved for the specified consumer group.
ImportantThis parameter is no longer supported in VVR 11.1 and later. For these versions, you must set the
startupModeoption toconsumer_group.maxRetries
The number of retries after a failed attempt to read from SLS.
String
No
3
N/A
batchGetSize
The number of log groups to read in a single request.
String
No
100
The
batchGetSizesetting cannot exceed 1000, or an error is reported.exitAfterFinish
Specifies whether the Flink job exits after all data is consumed.
String
No
false
-
true: The Flink program exits after all data is consumed. -
false(default): The Flink program does not exit after data consumption is complete.
query
ImportantThis option was deprecated in VVR 11.3, but later versions remain compatible.
The query statement for preprocessing data before consumption.
String
No
None
Use this option to filter SLS data before Flink consumes it. This reduces costs and improves processing speed.
For example,
'query' = '*| where request_method = ''GET'''indicates that before Flink reads data from SLS, it first matches data where the value of therequest_methodfield is 'GET'.NoteThis option uses the SPL language of Log Service (SLS). For more information, see SPL syntax.
Important-
This option is supported only in VVR 8.0.1 and later.
-
This feature incurs Log Service (SLS) fees. For more information, see Billing of Log Service.
processor
The name of the SLS processor for data preprocessing. If both
queryandprocessorare specified,querytakes precedence andprocessoris ignored.String
No
None
This option filters SLS data before Flink consumes it, which reduces costs and improves processing speed. We recommend that you use
processorinstead ofquery.For example,
'processor' = 'test-filter-processor'indicates that the SLS processor filters data before Flink reads the data from SLS.NoteThis option uses the SPL language of Log Service (SLS). For more information, see SPL syntax. For information about how to create or update an SLS processor, see Manage processors.
ImportantThis option is supported only in VVR 11.3 and later.
This feature incurs Log Service (SLS) fees. For more information, see Billing of Log Service.
-
-
Sink-specific
Parameter
Description
Type
Required
Default
Remarks
topicField
Specifies a field whose value overwrites the
__topic__attribute, which indicates the log topic.String
No
None
The value of this option must be an existing field in the table.
timeField
Specifies a field whose value overwrites the
__timestamp__attribute, which indicates the log write time.String
No
Current time
The value of this option must be an existing
INTfield in the table. If this option is not specified, the current time is used.sourceField
Specifies a field whose value overwrites the
__source__attribute, which indicates the log source, such as the IP address of the machine that generated the log.String
No
None
The value of this option must be an existing field in the table.
partitionField
Specifies a field for partitioning. A hash of this field's value determines which shard receives the data, ensuring records with the same hash go to the same shard.
String
No
None
If this option is not specified, each record is randomly written to an available shard.
buckets
If
partitionFieldis specified, this option defines the number of buckets for mapping hash values.String
No
64
The value must be a power of 2 in the range [1, 256]. The number of buckets must be greater than or equal to the number of shards. Otherwise, some shards may not receive any data.
flushIntervalMs
The data write interval.
String
No
2000
Unit: milliseconds.
writeNullProperties
Specifies whether to write null values as empty strings to SLS.
Boolean
No
true
-
true(default value): Writes null values to the log as empty strings. -
false: Fields that evaluate to null are not written to the log.
NoteThis option is supported only in VVR 8.0.6 and later.
-
Type mappings
|
Flink type |
SLS type |
|
BOOLEAN |
STRING |
|
VARBINARY |
|
|
VARCHAR |
|
|
TINYINT |
|
|
INTEGER |
|
|
BIGINT |
|
|
FLOAT |
|
|
DOUBLE |
|
|
DECIMAL |
Data ingestion (Beta)
Limitations
This feature is supported only by Realtime Compute for Apache Flink versions 11.1 and later.
Syntax
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>
Parameters
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
||||||||||
|
type |
The type of the data source. |
String |
Yes |
None |
The value must be |
||||||||||
|
endpoint |
The endpoint of Log Service (SLS). |
String |
Yes |
None |
The VPC access address of Log Service (SLS). For more information, see Service Endpoints. Note
|
||||||||||
|
accessId |
The AccessKey ID for your Alibaba Cloud account. |
String |
Yes |
None |
For more information, see How do I view the AccessKey ID and AccessKey secret information?. Important
To prevent your AccessKey information from being exposed, we recommend that you use a project variable to specify the AccessKey value. For more information, see Project variables. |
||||||||||
|
accessKey |
The AccessKey secret for your Alibaba Cloud account. |
String |
Yes |
None |
|||||||||||
|
project |
The name of the Log Service (SLS) project. |
String |
Yes |
None |
None |
||||||||||
|
logStore |
The name of the SLS Logstore or Metricstore. |
String |
Yes |
None |
Data in a Logstore is consumed in the same way as data in a Metricstore. |
||||||||||
|
schema.inference.strategy |
The schema inference strategy. |
String |
No |
continuous |
|
||||||||||
|
maxPreFetchLogGroups |
The maximum number of log groups to read from each shard for the initial schema inference. |
Integer |
No |
50 |
Before the job reads and processes data, the connector pre-consumes a specified number of log groups from each shard to initialize schema information. |
||||||||||
|
shardDiscoveryIntervalMs |
The interval, in milliseconds, for dynamically discovering shard changes. |
Long |
No |
60000 |
Set this parameter to a negative value to disable dynamic discovery. Note
The value must be greater than or equal to 60,000 milliseconds (1 minute). |
||||||||||
|
startupMode |
The startup mode. |
String |
No |
None |
|
||||||||||
|
startTime |
The start time for log consumption. |
String |
No |
Current time |
The format is This parameter takes effect only when Note
The |
||||||||||
|
stopTime |
The end time for log consumption. |
String |
No |
None |
The format is Note
If you want the Flink job to exit after all logs are consumed, you must also set |
||||||||||
|
consumerGroup |
The name of the consumer group. |
String |
No |
None |
A consumer group records consumption progress. You can specify any custom name. |
||||||||||
|
batchGetSize |
The number of log groups to read per request. |
Integer |
No |
100 |
The value of batchGetSize cannot exceed 1,000. Otherwise, an error occurs. |
||||||||||
|
maxRetries |
The number of retries if reading from Log Service (SLS) fails. |
Integer |
No |
3 |
None |
||||||||||
|
exitAfterFinish |
Specifies whether the Flink job exits after all data is consumed. |
Boolean |
No |
false |
|
||||||||||
|
query |
The preprocessing statement for consuming data from Log Service (SLS). |
String |
No |
None |
Use this parameter to filter data in Log Service (SLS) before consumption to save costs and improve processing speed. For example, Note
The query must use the SPL syntax of Log Service. For more information, see SPL syntax. Important
|
||||||||||
|
compressType |
The compression type for Log Service (SLS). |
String |
No |
None |
Supported compression types include:
|
||||||||||
|
timeZone |
The time zone for |
String |
No |
None |
By default, no offset is added. |
||||||||||
|
regionId |
The region where Log Service (SLS) is deployed. |
String |
No |
None |
For more information, see Supported regions. |
||||||||||
|
signVersion |
The request signature version for Log Service (SLS). |
String |
No |
None |
For more information, see Request signatures. |
||||||||||
|
shardModDivisor |
The divisor used when reading from Logstore shards. |
Int |
No |
-1 |
For more information, see Shards. |
||||||||||
|
shardModRemainder |
The remainder used when reading from Logstore shards. |
Int |
No |
-1 |
For more information, see Shards. |
||||||||||
|
metadata.list |
The metadata columns to pass to downstream jobs. |
String |
No |
None |
Available metadata fields include |
||||||||||
|
decode.table-id.fields |
Specifies fields whose values are used to generate a Table ID when parsing log data from Log Service (SLS). |
String |
No |
None |
Multiple fields are separated by an English comma
Note
This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later. |
||||||||||
|
fixed-types |
Specifies the data types for specific fields when parsing log data from Log Service (SLS). |
String |
No |
None |
When parsing data, specify the types for specific fields. Use a comma Note
This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later. |
||||||||||
|
timestamp-format.standard |
The format for timestamp fields in the log data from Log Service (SLS). |
String |
No |
SQL |
Valid values:
Note
This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later. |
||||||||||
|
ingestion.ignore-errors |
Specifies whether to ignore errors that occur during data parsing. |
Boolean |
No |
false |
Note
This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later. |
||||||||||
|
ingestion.error-tolerance.max-count |
If |
Integer |
No |
-1 |
This parameter takes effect only when Note
This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later. |
Reuse an existing catalog
Starting from Realtime Compute for Apache Flink version 11.5, you can reference a built-in SLS Catalog created on the Data Management page in Flink CDC data ingestion jobs. This avoids the need to manually specify connection properties.
source:
type: sls
using.built-in-catalog: sls_catalog
Currently, data ingestion jobs can automatically reuse the following parameters from the built-in SLS Catalog:
-
endpoint
-
project
-
accessId
-
accessKey
To override these automatically reused parameters, you can explicitly define them in the YAML configuration. Parameters defined in the YAML file take precedence.
Data type mapping
If fixed-types is not configured, the following data type mapping applies:
|
SLS type |
CDC type |
|
STRING |
STRING |
If fixed-types is configured, the system parses data by using the specified types.
Schema inference and evolution
-
Shard data pre-consumption and schema initialization
The SLS connector maintains the schema of the Logstore it is reading from. Before reading data from the Logstore, the connector pre-consumes up to
maxPreFetchLogGroupslog groups from each shard. It parses the schema of each log entry and merges them to initialize the schema for the table. A table creation event is then generated based on this initial schema before data consumption begins.NoteFor each shard, the connector attempts to consume data starting from one hour before the current time to parse the log schema.
-
Primary key information
Log Service (SLS) logs do not contain primary key information. You can manually add a primary key to the table by using transform rules:
transform: - source-table: <project>.<logstore> projection: * primary-keys: key1, key2 -
Schema inference and schema changes
After the schema is initialized, if schema.inference.strategy is set to
static, the SLS connector parses each log entry based on the initial schema and does not generate schema change events. If schema.inference.strategy is set tocontinuous, the connector parses each log entry, infers the physical columns, and compares them with the current schema. If the inferred schema is inconsistent with the current schema, the schemas are merged according to the following rules:-
If the inferred physical columns contain fields that are not in the current schema, the connector adds these fields to the schema and generates an event to add nullable columns.
-
If the inferred physical columns are missing fields that exist in the current schema, the connector retains these fields, populates their data with NULL, and does not generate a column deletion event.
The SLS connector infers the data type of all fields in each log entry as String. Currently, only adding new columns is supported. The connector appends new columns to the end of the schema and sets them to nullable.
-
Code examples
-
SQL for source and sink tables
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input; -
Data ingestion with an SLS data source
Use SLS as a data source to ingest data in real time into supported downstream systems. For example, the following configuration defines a data ingestion job that writes data from a logstore to a Paimon-formatted data lake in Data Lake Formation (DLF). The job automatically infers the schema of the sink table and supports schema evolution at runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Add a primary key to the table.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# Route all data from test_project.test_log to the test_database.inventory table.
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
DataStream API
To read or write data with the DataStream API, use a DataStream connector. For more information, see Usage of DataStream connectors.
If you use a VVR version earlier than 8.0.10, your job might fail to start due to missing dependencies. To resolve this issue, add the corresponding uber-JAR as an additional dependency.
Read from SLS
Realtime Compute for Apache Flink provides the SlsSourceFunction class, an implementation of SourceFunction, for reading data from Simple Log Service (SLS). The following example reads data from SLS.
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates an SLS source and prints the data to the console.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size is required.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// Start time for consumption, set to the current time.
int startInSec = (int) (new Date().getTime() / 1000);
// Stop time for consumption, where -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
Write to SLS
Realtime Compute for Apache Flink provides the SLSOutputFormat class, an implementation of OutputFormat, for writing data to SLS. The following example writes data to SLS.
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
XML
The SLS DataStream connector is available in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>