Log Service (SLS)

更新时间:
复制 MD 格式

Learn how to use the Log Service (SLS) connector.

Background

Simple Log Service is an end-to-end service for log data. It helps you collect, consume, ship, query, and analyze log data efficiently. This improves O&M efficiency and enables you to process massive amounts of log data.

The following table lists the capabilities of the SLS connector.

Category

Description

Supported Types

Source table and sink table

running mode

Streaming mode only

Connector-specific metrics

N/A

Data format

N/A

API type

SQL, DataStream API, and data ingestion YAML API

Data update or deletion in the sink table

Sink tables are append-only; you cannot update or delete data.

Features

The SLS source connector directly reads message attribute fields. The following table lists the supported fields.

Parameter

Type

Description

__source__

STRING METADATA VIRTUAL

Message source.

__topic__

STRING METADATA VIRTUAL

Message topic.

__timestamp__

BIGINT METADATA VIRTUAL

Log time.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Message tag.

For example, for the attribute "__tag__:__receive_time__":"1616742274", '__receive_time__' and '1616742274' are stored as a key-value pair in the map. To access the value in SQL, use __tag__['__receive_time__'].

Prerequisites

Ensure you have created a Log Service Project and a Logstore. For more information, see Create a Project and a Logstore.

Limitations

  • Only Ververica Runtime (VVR) 11.1 and later supports using SLS as a synchronous source for data ingestion defined in YAML.

  • The SLS connector guarantees only at-least-once semantics.

  • Do not set the source parallelism higher than the number of shards, as this wastes resources. Additionally, for Ververica Runtime (VVR) 8.0.5 and earlier, a change in the number of shards can cause the automatic failover feature to fail, preventing some shards from being consumed.

SQL

Syntax

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

With options

  • General

    Parameter

    Description

    Type

    Required

    Default

    Remarks

    connector

    The connector to use.

    String

    Yes

    None

    Set this to sls.

    endPoint

    The endpoint of Log Service (SLS).

    String

    Yes

    None

    Specify the VPC access address of Log Service (SLS). For more information, see Service endpoints.

    Note
    • By default, Realtime Compute for Apache Flink cannot access the internet. To enable internet access from your Virtual Private Cloud (VPC), use a NAT Gateway. For more information, see How do I access the Internet?.

    • We recommend that you do not access SLS over the internet. If you must do so, use HTTPS and enable transfer acceleration. For more information, see Manage transfer acceleration.

    project

    The name of the SLS project.

    String

    Yes

    None

    N/A

    logStore

    The name of the Logstore or MetricStore.

    String

    Yes

    None

    Data in a Logstore is consumed in the same way as data in a MetricStore.

    accessId

    The AccessKey ID of your Alibaba Cloud account.

    String

    Yes

    None

    For more information, see Obtain an AccessKey pair.

    Important

    To prevent your AccessKey pair from being exposed, we recommend that you use variables to specify the AccessKey ID and AccessKey secret. For more information, see Project variables.

    accessKey

    The AccessKey secret of your Alibaba Cloud account.

    String

    Yes

    None

  • Source-specific

    Parameter

    Description

    Type

    Required

    Default

    Remarks

    enableNewSource

    Specifies whether to use the new data source that implements the FLIP-27 interface.

    Boolean

    No

    false

    The new source can automatically adapt to shard changes and distribute shards as evenly as possible across all source subtasks.

    Important
    • This option is supported only in VVR 8.0.9 and later. The default value is true for VVR 11.1 and later.

    • If you change this option's value, you cannot restore jobs from a saved state. To resume consumption from a historical offset, first start the job with the consumerGroup option to record the consumption progress in the SLS consumer group. Then, set the consumeFromCheckpoint option to true and restart the job without a state.

    • If a Logstore has read-only shards, some subtasks might continue requesting data from other shards after finishing their own. This can cause an imbalanced workload, affecting performance. To mitigate this issue, you can adjust the parallelism, optimize the scheduling strategy, or merge small shards to reduce the number of shards and simplify task allocation.

    shardDiscoveryIntervalMs

    The interval for dynamic shard discovery.

    Long

    No

    60000

    Set this option to a negative value to disable dynamic discovery. Unit: milliseconds.

    Note
    • The value must be greater than or equal to 60,000 milliseconds (1 minute).

    • This option takes effect only when enableNewSource is set to true.

    • This option is supported only in VVR 8.0.9 and later.

    startupMode

    The startup mode for the source table.

    String

    No

    timestamp

    • timestamp (default): Consume logs starting from the specified start time.

    • latest: Start consuming logs from the latest offset.

    • earliest: Starts consuming logs from the earliest offset.

    • consumer_group: Starts log consumption from the offset recorded by the consumer group. If the consumer group has not recorded a consumption offset for a shard, consumption starts from the earliest offset.

    Important
    • For VVR versions earlier than 11.1, the consumer_group value is not supported. You must set consumeFromCheckpoint to true. In this case, log consumption starts from the offset recorded by the specified consumer group, and the startup mode setting will not take effect.

    startTime

    The start time for log consumption.

    String

    No

    Current time

    The format is yyyy-MM-dd hh:mm:ss.

    This takes effect only when startupMode is set to timestamp.

    Note

    The startTime and stopTime options are based on the __receive_time__ attribute in SLS, rather than the __timestamp__ attribute.

    stopTime

    The end time for log consumption.

    String

    No

    None

    The format is yyyy-MM-dd hh:mm:ss.

    Note
    • This option is used only for consuming historical logs and must be set to a time in the past. If you set it to a future time, consumption may stop prematurely if no new logs are written, resulting in a data flow interruption without any error messages.

    • If you want the Flink job to exit after all logs are consumed, you must also set exitAfterFinish to true.

    consumerGroup

    The name of the consumer group.

    String

    No

    None

    A consumer group is used to record consumption progress. You can specify a custom name without a fixed format.

    Note

    Different Flink jobs must use different consumer groups. If multiple Flink jobs use the same consumer group, they do not coordinate and each job consumes all the data. This is because Flink does not use the SLS consumer group for partition assignment when it consumes data from SLS. As a result, each consumer consumes messages independently, even if they share the same consumer group.

    consumeFromCheckpoint

    Specifies whether to consume from a consumer group checkpoint.

    String

    No

    false

    • true: You must also specify a consumer group. The Flink program starts consuming logs from the checkpoint saved in the consumer group. If the consumer group does not have a corresponding checkpoint, consumption starts from the startTime configuration value.

    • false (default value): Does not start consuming logs from the checkpoint saved for the specified consumer group.

    Important

    This parameter is no longer supported in VVR 11.1 and later. For these versions, you must set the startupMode option to consumer_group.

    maxRetries

    The number of retries after a failed attempt to read from SLS.

    String

    No

    3

    N/A

    batchGetSize

    The number of log groups to read in a single request.

    String

    No

    100

    The batchGetSize setting cannot exceed 1000, or an error is reported.

    exitAfterFinish

    Specifies whether the Flink job exits after all data is consumed.

    String

    No

    false

    • true: The Flink program exits after all data is consumed.

    • false (default): The Flink program does not exit after data consumption is complete.

    query

    Important

    This option was deprecated in VVR 11.3, but later versions remain compatible.

    The query statement for preprocessing data before consumption.

    String

    No

    None

    Use this option to filter SLS data before Flink consumes it. This reduces costs and improves processing speed.

    For example, 'query' = '*| where request_method = ''GET''' indicates that before Flink reads data from SLS, it first matches data where the value of the request_method field is 'GET'.

    Note

    This option uses the SPL language of Log Service (SLS). For more information, see SPL syntax.

    Important
    • This option is supported only in VVR 8.0.1 and later.

    • This feature incurs Log Service (SLS) fees. For more information, see Billing of Log Service.

    processor

    The name of the SLS processor for data preprocessing. If both query and processor are specified, query takes precedence and processor is ignored.

    String

    No

    None

    This option filters SLS data before Flink consumes it, which reduces costs and improves processing speed. We recommend that you use processor instead of query.

    For example, 'processor' = 'test-filter-processor' indicates that the SLS processor filters data before Flink reads the data from SLS.

    Note

    This option uses the SPL language of Log Service (SLS). For more information, see SPL syntax. For information about how to create or update an SLS processor, see Manage processors.

    Important

    This option is supported only in VVR 11.3 and later.

    This feature incurs Log Service (SLS) fees. For more information, see Billing of Log Service.

  • Sink-specific

    Parameter

    Description

    Type

    Required

    Default

    Remarks

    topicField

    Specifies a field whose value overwrites the __topic__ attribute, which indicates the log topic.

    String

    No

    None

    The value of this option must be an existing field in the table.

    timeField

    Specifies a field whose value overwrites the __timestamp__ attribute, which indicates the log write time.

    String

    No

    Current time

    The value of this option must be an existing INT field in the table. If this option is not specified, the current time is used.

    sourceField

    Specifies a field whose value overwrites the __source__ attribute, which indicates the log source, such as the IP address of the machine that generated the log.

    String

    No

    None

    The value of this option must be an existing field in the table.

    partitionField

    Specifies a field for partitioning. A hash of this field's value determines which shard receives the data, ensuring records with the same hash go to the same shard.

    String

    No

    None

    If this option is not specified, each record is randomly written to an available shard.

    buckets

    If partitionField is specified, this option defines the number of buckets for mapping hash values.

    String

    No

    64

    The value must be a power of 2 in the range [1, 256]. The number of buckets must be greater than or equal to the number of shards. Otherwise, some shards may not receive any data.

    flushIntervalMs

    The data write interval.

    String

    No

    2000

    Unit: milliseconds.

    writeNullProperties

    Specifies whether to write null values as empty strings to SLS.

    Boolean

    No

    true

    • true (default value): Writes null values to the log as empty strings.

    • false: Fields that evaluate to null are not written to the log.

    Note

    This option is supported only in VVR 8.0.6 and later.

Type mappings

Flink type

SLS type

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Data ingestion (Beta)

Limitations

This feature is supported only by Realtime Compute for Apache Flink versions 11.1 and later.

Syntax

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Parameters

Parameter

Description

Type

Required

Default

Remarks

type

The type of the data source.

String

Yes

None

The value must be sls.

endpoint

The endpoint of Log Service (SLS).

String

Yes

None

The VPC access address of Log Service (SLS). For more information, see Service Endpoints.

Note
  • By default, Realtime Compute for Apache Flink cannot access the internet. You can use a NAT Gateway to enable communication between your Virtual Private Cloud (VPC) and the internet. For more information, see How can I access the internet?.

  • We do not recommend accessing Log Service (SLS) over the internet. If you must do so, use HTTPS and enable transfer acceleration for SLS.

accessId

The AccessKey ID for your Alibaba Cloud account.

String

Yes

None

For more information, see How do I view the AccessKey ID and AccessKey secret information?.

Important

To prevent your AccessKey information from being exposed, we recommend that you use a project variable to specify the AccessKey value. For more information, see Project variables.

accessKey

The AccessKey secret for your Alibaba Cloud account.

String

Yes

None

project

The name of the Log Service (SLS) project.

String

Yes

None

None

logStore

The name of the SLS Logstore or Metricstore.

String

Yes

None

Data in a Logstore is consumed in the same way as data in a Metricstore.

schema.inference.strategy

The schema inference strategy.

String

No

continuous

  • continuous: Performs schema inference for each data record. If schemas are incompatible, a wider schema is inferred and a schema change event is generated.

  • static: Performs schema inference only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema change events are generated.

maxPreFetchLogGroups

The maximum number of log groups to read from each shard for the initial schema inference.

Integer

No

50

Before the job reads and processes data, the connector pre-consumes a specified number of log groups from each shard to initialize schema information.

shardDiscoveryIntervalMs

The interval, in milliseconds, for dynamically discovering shard changes.

Long

No

60000

Set this parameter to a negative value to disable dynamic discovery.

Note

The value must be greater than or equal to 60,000 milliseconds (1 minute).

startupMode

The startup mode.

String

No

None

  • timestamp (default): Consumes logs from a specific timestamp.

  • latest: Consumes logs from the latest offset.

  • earliest: Consumes logs from the earliest offset.

  • consumer_group: Consumes logs from the offset recorded in the consumer group. If no offset is recorded for a shard, consumption starts from the earliest offset.

startTime

The start time for log consumption.

String

No

Current time

The format is yyyy-MM-dd HH:mm:ss.

This parameter takes effect only when startupMode is set to timestamp.

Note

The startTime and stopTime parameters are based on the __receive_time__ attribute in Log Service (SLS), not the __timestamp__ attribute.

stopTime

The end time for log consumption.

String

No

None

The format is yyyy-MM-dd HH:mm:ss.

Note

If you want the Flink job to exit after all logs are consumed, you must also set exitAfterFinish=true.

consumerGroup

The name of the consumer group.

String

No

None

A consumer group records consumption progress. You can specify any custom name.

batchGetSize

The number of log groups to read per request.

Integer

No

100

The value of batchGetSize cannot exceed 1,000. Otherwise, an error occurs.

maxRetries

The number of retries if reading from Log Service (SLS) fails.

Integer

No

3

None

exitAfterFinish

Specifies whether the Flink job exits after all data is consumed.

Boolean

No

false

  • true: The Flink job exits after all data is consumed.

  • false (default): The Flink job does not exit after all data is consumed.

query

The preprocessing statement for consuming data from Log Service (SLS).

String

No

None

Use this parameter to filter data in Log Service (SLS) before consumption to save costs and improve processing speed.

For example, 'query' = '*| where request_method = ''GET''' filters for data where the request_method field is 'GET' before the data is read by Flink.

Note

The query must use the SPL syntax of Log Service. For more information, see SPL syntax.

Important
  • For information about the regions where this feature is available in Log Service (SLS), see Consume logs based on rules.

  • This feature is in Beta and is free of charge. You may be charged for this feature in the future. For more information, see Pricing.

compressType

The compression type for Log Service (SLS).

String

No

None

Supported compression types include:

  • lz4

  • deflate

  • zstd

timeZone

The time zone for startTime and stopTime.

String

No

None

By default, no offset is added.

regionId

The region where Log Service (SLS) is deployed.

String

No

None

For more information, see Supported regions.

signVersion

The request signature version for Log Service (SLS).

String

No

None

For more information, see Request signatures.

shardModDivisor

The divisor used when reading from Logstore shards.

Int

No

-1

For more information, see Shards.

shardModRemainder

The remainder used when reading from Logstore shards.

Int

No

-1

For more information, see Shards.

metadata.list

The metadata columns to pass to downstream jobs.

String

No

None

Available metadata fields include __source__, __topic__, __timestamp__, and __tag__. Separate multiple fields with a comma.

decode.table-id.fields

Specifies fields whose values are used to generate a Table ID when parsing log data from Log Service (SLS).

String

No

None

Multiple fields are separated by an English comma ,. For example, if the upstream SLS log record is {"col0":"a", "col1":"b", "col2":"c"}, the results for different parameter configurations are as follows:

Configuration

Table ID

None

All messages are Project.Logstore

col0

a

col0,col1

a.b

col0,col1,col2

a.b.c

Note

This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later.

fixed-types

Specifies the data types for specific fields when parsing log data from Log Service (SLS).

String

No

None

When parsing data, specify the types for specific fields. Use a comma , to separate multiple field definitions. For example, id BIGINT, name VARCHAR(10) specifies the type of the id field as BIGINT and the type of the name field as VARCHAR(10).

Note

This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later.

timestamp-format.standard

The format for timestamp fields in the log data from Log Service (SLS).

String

No

SQL

Valid values:

  • SQL: Parses input timestamps in the yyyy-MM-dd HH:mm:ss.s{precision} format (for example, 2020-12-30 12:13:14.123) and outputs them in the same format.

  • ISO-8601: Parses input timestamps in the yyyy-MM-ddTHH:mm:ss.s{precision} format (for example, 2020-12-30T12:13:14.123) and outputs them in the same format.

Note

This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later.

ingestion.ignore-errors

Specifies whether to ignore errors that occur during data parsing.

Boolean

No

false

Note

This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later.

ingestion.error-tolerance.max-count

If ingestion.ignore-errors is enabled, the job fails when the cumulative number of errors exceeds this value.

Integer

No

-1

This parameter takes effect only when ingestion.ignore-errors is enabled. The default value of -1 means that the job ignores all parsing exceptions.

Note

This parameter is supported in Realtime Compute for Apache Flink versions 11.6 and later.

Reuse an existing catalog

Starting from Realtime Compute for Apache Flink version 11.5, you can reference a built-in SLS Catalog created on the Data Management page in Flink CDC data ingestion jobs. This avoids the need to manually specify connection properties.

source:
  type: sls
  using.built-in-catalog: sls_catalog

Currently, data ingestion jobs can automatically reuse the following parameters from the built-in SLS Catalog:

  • endpoint

  • project

  • accessId

  • accessKey

To override these automatically reused parameters, you can explicitly define them in the YAML configuration. Parameters defined in the YAML file take precedence.

Data type mapping

If fixed-types is not configured, the following data type mapping applies:

SLS type

CDC type

STRING

STRING

If fixed-types is configured, the system parses data by using the specified types.

Schema inference and evolution

  • Shard data pre-consumption and schema initialization

    The SLS connector maintains the schema of the Logstore it is reading from. Before reading data from the Logstore, the connector pre-consumes up to maxPreFetchLogGroups log groups from each shard. It parses the schema of each log entry and merges them to initialize the schema for the table. A table creation event is then generated based on this initial schema before data consumption begins.

    Note

    For each shard, the connector attempts to consume data starting from one hour before the current time to parse the log schema.

  • Primary key information

    Log Service (SLS) logs do not contain primary key information. You can manually add a primary key to the table by using transform rules:

    transform:
      - source-table: <project>.<logstore>
        projection: *
        primary-keys: key1, key2
  • Schema inference and schema changes

    After the schema is initialized, if schema.inference.strategy is set to static, the SLS connector parses each log entry based on the initial schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the connector parses each log entry, infers the physical columns, and compares them with the current schema. If the inferred schema is inconsistent with the current schema, the schemas are merged according to the following rules:

    • If the inferred physical columns contain fields that are not in the current schema, the connector adds these fields to the schema and generates an event to add nullable columns.

    • If the inferred physical columns are missing fields that exist in the current schema, the connector retains these fields, populates their data with NULL, and does not generate a column deletion event.

    The SLS connector infers the data type of all fields in each log entry as String. Currently, only adding new columns is supported. The connector appends new columns to the end of the schema and sets them to nullable.

Code examples

  • SQL for source and sink tables

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Data ingestion with an SLS data source

    Use SLS as a data source to ingest data in real time into supported downstream systems. For example, the following configuration defines a data ingestion job that writes data from a logstore to a Paimon-formatted data lake in Data Lake Formation (DLF). The job automatically infers the schema of the sink table and supports schema evolution at runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# Add a primary key to the table.
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# Route all data from test_project.test_log to the test_database.inventory table.
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

DataStream API

Important

To read or write data with the DataStream API, use a DataStream connector. For more information, see Usage of DataStream connectors.

If you use a VVR version earlier than 8.0.10, your job might fail to start due to missing dependencies. To resolve this issue, add the corresponding uber-JAR as an additional dependency.

Read from SLS

Realtime Compute for Apache Flink provides the SlsSourceFunction class, an implementation of SourceFunction, for reading data from Simple Log Service (SLS). The following example reads data from SLS.

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates an SLS source and prints the data to the console.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size is required.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // Start time for consumption, set to the current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // Stop time for consumption, where -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Write to SLS

Realtime Compute for Apache Flink provides the SLSOutputFormat class, an implementation of OutputFormat, for writing data to SLS. The following example writes data to SLS.

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

The SLS DataStream connector is available in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

FAQ

How to resolve a TaskManager OOM (java.lang.OutOfMemoryError: Java heap space) when restoring a failed Flink program?