日志服务SLS

本文为您介绍日志服务SLS连接器。

背景信息

日志服务是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。

SLS连接器支持的信息如下。

类别

详情

支持类型

源表和结果表

运行模式

仅支持流模式

特有监控指标

暂不适用

数据格式

暂无

API种类

SQL

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

特色功能

SLS连接器源表支持直接读取消息的属性字段,支持的属性字段如下。

字段名

字段类型

字段说明

__source__

STRING METADATA VIRTUAL

消息源。

__topic__

STRING METADATA VIRTUAL

消息主题。

__timestamp__

BIGINT METADATA VIRTUAL

日志时间。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

消息TAG。

对于属性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'会被作为KV对,记录在Map中,在SQL中通过__tag__['__receive_time__']的方式访问。

前提条件

已创建日志服务Project和Logstore,详情请参见创建Project和Logstore

使用限制

  • 仅实时计算引擎VVR 2.0.0及以上版本支持日志服务SLS连接器。

  • 日志服务SLS连接器不支持作为维表。

  • SLS连接器仅保证At-Least-Once语义。

  • 仅实时计算引擎VVR 4.0.13及以上版本支持exitAfterFinish参数和Shard数目变化触发自动Failover功能。

  • 仅实时计算引擎VVR 6.0.5及以上版本支持consumeFromCheckpointbuckets参数。

语法结构

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessKey>'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    详情请参见服务入口

    project

    SLS项目名称。

    String

    无。

    logStore

    SLS LogStore或metricstore名称。

    String

    logStore和metricstore是相同的消费方式。

    accessId

    阿里云账号的AccessKey ID。

    String

    无。

    accessKey

    阿里云账号的AccessKey Secret。

    String

    无。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    startTime

    消费日志的开始时间。

    String

    当前时间

    格式为yyyy-MM-dd hh:mm:ss。

    stopTime

    消费日志的结束时间。

    String

    格式为yyyy-MM-dd hh:mm:ss。

    consumerGroup

    消费组名称。

    String

    您可以自定义消费组名,无固定格式。

    consumeFromCheckpoint

    是否从指定的消费组中保存的Checkpoint开始消费日志。

    String

    false

    参数取值如下:

    • true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。

    • false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。

    说明

    仅实时计算引擎VVR 6.0.5及以上版本支持该参数。

    directMode

    是否使用SLS的直连模式。

    String

    false

    参数取值如下:

    • true:使用该功能。

    • false(默认):禁用该功能。

    maxRetries

    读取SLS失败后重试次数。

    String

    3

    无。

    batchGetSize

    单次请求读取logGroup的个数。

    String

    100

    batchGetSize设置不能超过1000,否则会报错。

    exitAfterFinish

    在数据消费完成后,Flink程序是否退出。

    String

    false

    参数取值如下:

    • true:数据消费完后,Flink程序退出。

    • false(默认):数据消费完后,Flink程序不退出。

    说明

    仅实时计算引擎VVR 4.0.13版本和VVR 6.0.0及以上版本支持该参数。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    topicField

    指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。

    String

    该参数值是表中已存在的字段之一。

    timeField

    指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。

    String

    当前时间

    该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。

    sourceField

    指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。

    String

    该参数值是表中已存在的字段之一。

    partitionField

    指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。

    String

    如果未指定,则每条数据会随机写入当前可用的Shard中。

    buckets

    当指定partitionField时,根据Hash值重新分组的个数。

    String

    64

    该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。

    说明

    仅实时计算引擎VVR 6.0.5及以上版本支持该参数。

    flushIntervalMs

    触发数据写入的时间周期。

    String

    2000

    单位毫秒。

类型映射

Flink字段类型

SLS字段类型

VARCHAR

STRING

代码示例

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessid' ='xx',
  'accesskey' ='xxx',
  'starttime' = '2001-08-01 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING ,
  `__source__` STRING ,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessid' ='xx',
  'accesskey' ='xxx',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DataStream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了SLS DataStream连接器

  • 读取SLS

    实时计算引擎VVR提供SourceFunction的实现类SlsSourceFunction,用于读取SLS,读取SLS的示例如下。

    public class SlsDataStreamSource {
    
        public static void main(String[] args) throws Exception {
            // Sets up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Creates and adds SLS source and sink.
            env.addSource(createSlsSource())
                    .map(SlsDataStreamSource::convertMessages)
                    .print();
            env.execute("SLS Stream Source");
        }
    
        private static SlsSourceFunction createSlsSource() {
            SLSAccessInfo accessInfo = new SLSAccessInfo();
            accessInfo.setEndpoint("yourEndpoint");
            accessInfo.setProjectName("yourProject");
            accessInfo.setLogstore("yourLogStore");
            accessInfo.setAccessId("yourAccessId");
            accessInfo.setAccessKey("yourAccessKey");
    
            // The batch get size must be given.
            accessInfo.setBatchGetSize(10);
    
            // Optional parameters
            accessInfo.setConsumerGroup("yourConsumerGroup");
            accessInfo.setMaxRetries(3);
    
            // time to start consuming, set to current time.
            int startInSec = (int) (new Date().getTime() / 1000);
    
            // time to stop consuming, -1 means never stop.
            int stopInSec = -1;
    
            return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
        }
    
        private static List<String> convertMessages(SourceRecord input) {
            List<String> res = new ArrayList<>();
            for (FastLogGroup logGroup : input.getLogGroups()) {
                int logsCount = logGroup.getLogsCount();
                for (int i = 0; i < logsCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    int fieldCount = log.getContentsCount();
                    for (int idx = 0; idx < fieldCount; idx++) {
                        FastLogContent f = log.getContents(idx);
                        res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                    }
                }
            }
            return res;
        }
    }
  • 写入SLS

    提供OutputFormat的实现类SLSOutputFormat,用于写入SLS。写入SLS的示例如下。

    public class SlsDataStreamSink {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.fromSequence(0, 100)
                    .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                    .addSink(createSlsSink())
                    .name(SlsDataStreamSink.class.getSimpleName());
            env.execute("SLS Stream Sink");
        }
    
        private static OutputFormatSinkFunction createSlsSink() {
            Configuration conf = new Configuration();
            conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
            conf.setString(SLSOptions.PROJECT, "yourProject");
            conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
            conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
            conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
            SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
            return new OutputFormatSinkFunction<>(outputFormat);
        }
    
        private static SinkRecord getSinkRecord(Long seed) {
            SinkRecord record = new SinkRecord();
            LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
            logItem.PushBack("level", "info");
            logItem.PushBack("name", String.valueOf(seed));
            logItem.PushBack("message", "it's a test message for " + seed.toString());
            record.setContent(logItem);
            return record;
        }
    
    }

常见问题

恢复失败的Flink程序时,TaskManager发生OOM,源表报错java.lang.OutOfMemoryError: Java heap space

阿里云首页 实时计算 Flink版 相关技术圈