本文为您介绍如何使用日志服务SLS连接器。
背景信息
日志服务是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。
SLS连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 源表和结果表 | 
| 运行模式 | 仅支持流模式 | 
| 特有监控指标 | 暂不适用 | 
| 数据格式 | 暂无 | 
| API种类 | SQL,Datastream和数据摄入YAML | 
| 是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 | 
特色功能
SLS连接器源表支持直接读取消息的属性字段,支持的属性字段如下。
| 字段名 | 字段类型 | 字段说明 | 
| __source__ | STRING METADATA VIRTUAL | 消息源。 | 
| __topic__ | STRING METADATA VIRTUAL | 消息主题。 | 
| __timestamp__ | BIGINT METADATA VIRTUAL | 日志时间。 | 
| __tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 消息TAG。 对于属性 | 
前提条件
已创建日志服务Project和Logstore,详情请参见创建Project和Logstore。
使用限制
- 仅实时计算引擎VVR 11.1及以上版本支持日志服务SLS作为数据摄入YAML的同步数据源。 
- SLS连接器仅保证At-Least-Once语义。 
- 强烈建议不要设置Source并发度大于Shard个数,不仅会造成资源浪费,且在8.0.5及更低版本中,如果后续Shard数目发生变化,自动Failover功能可能会失效,造成部分Shard不被消费。 
SQL
语法结构
CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);WITH参数
- 通用 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - connector - 表类型。 - String - 是 - 无 - 固定值sls。 - endPoint - EndPoint地址。 - String - 是 - 无 - 请填写SLS的私网服务地址,详情请参见服务接入点。 - project - SLS项目名称。 - String - 是 - 无 - 无。 - logStore - SLS LogStore或metricstore名称。 - String - 是 - 无 - logStore和metricstore是相同的消费方式。 - accessId - 阿里云账号的AccessKey ID。 - String - 是 - 无 - 详情请参见如何查看AccessKey ID和AccessKey Secret信息?。 重要- 为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 - accessKey - 阿里云账号的AccessKey Secret。 - String - 是 - 无 
- 源表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - enableNewSource - 是否启用实现了FLIP-27接口的新数据源。 - Boolean - 否 - false - 新数据源可以自动适应Shard变化,同时尽可能保证Shard在所有的source并发上分布均匀。 重要- 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。从实时计算引擎VVR 11.1版本开始该参数默认为true。 
- 作业在该配置项发生变化后无法从状态恢复。可通过先设置配置项consumerGroup启动作业,将消费进度记录到SLS消费组中,再将配置项consumeFromCheckpoint设为true后无状态启动作业,从而实现从历史进度继续消费。 
- 如果SLS中存在只读Shard,Flink的某些并发任务在完成对只读Shard的消费后会继续请求读取其他未完成的Shard。这可能导致部分并发任务被分配到多个Shard,从而造成不同并发任务之间的Shard分配不均衡。这种不均衡会影响整体的消费效率和系统性能。为缓解这一问题,您可以通过调整并发度、优化任务调度策略、合并小Shard等方法,以减少Shard数量和任务分配复杂度。 
 - shardDiscoveryIntervalMs - 动态检测shard变化时间间隔,单位为毫秒。 - Long - 否 - 60000 - 设置为负值时可以关闭动态检测。 说明- 该参数值不能少于1分钟(60000毫秒)。 
- 仅当配置项enableNewSource为true时生效。 
- 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。 
 - startupMode - 源表启动模式。 - String - 否 - timestamp - timestamp(默认):从指定的起始时间开始消费日志。
- latest:从最新位点开始消费日志。
- earliest:从最早位点开始消费日志。
- consumer_group:从消费组记录位点开始消费日志。若消费组未记录某shard消费位点,则会从最早位点开始消费日志。
 重要- 实时计算引擎VVR 11.1以下版本,不支持取值为consumer_group,需要将 - consumeFromCheckpoint设为- true,此时会从指定消费组记录的位点开始消费日志,此处的启动模式将不会生效。
 - startTime - 消费日志的开始时间。 - String - 否 - 当前时间 - 格式为 - yyyy-MM-dd hh:mm:ss。- 仅当 - startupMode设为- timestamp时生效。说明- startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。 - stopTime - 消费日志的结束时间。 - String - 否 - 无 - 格式为 - yyyy-MM-dd hh:mm:ss。说明- 仅用于消费历史日志,应设置为过去时间点。若配置为未来时间,可能因暂无新日志写入而导致消费提前终止,表现为数据流中断且无异常提示。 
- 如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true. 
 - consumerGroup - 消费组名称。 - String - 否 - 无 - 消费组用于记录消费进度。您可以自定义消费组名,无固定格式。 说明- 不支持通过相同的消费组进行多作业的协同消费。不同的Flink作业应该设置不同的消费组。如果不同的Flink作业使用相同的消费组,它们将会消费全部数据。这是因为在Flink消费SLS的数据时,并不会经过SLS消费组进行分区分配,因此导致各个消费者独立消费各自的消息,即使消费组是相同的。 - consumeFromCheckpoint - 是否从指定的消费组中保存的Checkpoint开始消费日志。 - String - 否 - false - true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。
- false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。
 重要- 实时计算引擎VVR 11.1版本开始不再支持配置该参数。对于VVR 11.1及更高版本,需要将 - startupMode配置为- consumer_group。- maxRetries - 读取SLS失败后重试次数。 - String - 否 - 3 - 无。 - batchGetSize - 单次请求读取logGroup的个数。 - String - 否 - 100 - batchGetSize设置不能超过1000,否则会报错。- exitAfterFinish - 在数据消费完成后,Flink程序是否退出。 - String - 否 - false - true:数据消费完后,Flink程序退出。
- false(默认):数据消费完后,Flink程序不退出。
 - query 重要- 自VVR 11.3起废弃,后续版本仍兼容。 - SLS消费预处理语句。 - String - 否 - 无 - 通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。 - 例如 - 'query' = '*| where request_method = ''GET'''表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。说明- query需使用日志服务SPL语言,请参见SPL语法。 重要- 仅实时计算引擎VVR 8.0.1及以上版本支持该参数。 
- 该功能会产生日志服务SLS费用,详情请参见费用说明。 
 - processor - SLS 消费处理器,与query字段同时存在时,query生效,processor不生效。 - String - 否 - 无 - 通过使用processor参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。推荐使用processor参数而不是query参数。 - 例如 - 'processor' = 'test-filter-processor'表示在Flink读取SLS数据前,先经过SLS消费处理器的过滤。重要- 仅实时计算引擎VVR 11.3及以上版本支持该参数。 - 该功能会产生日志服务SLS费用,详情请参见费用说明。 
- 结果表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - topicField - 指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。 - String - 否 - 无 - 该参数值是表中已存在的字段之一。 - timeField - 指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。 - String - 否 - 当前时间 - 该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。 - sourceField - 指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。 - String - 否 - 无 - 该参数值是表中已存在的字段之一。 - partitionField - 指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。 - String - 否 - 无 - 如果未指定,则每条数据会随机写入当前可用的Shard中。 - buckets - 当指定partitionField时,根据Hash值重新分组的个数。 - String - 否 - 64 - 该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。 - flushIntervalMs - 触发数据写入的时间周期。 - String - 否 - 2000 - 单位为毫秒。 - writeNullProperties - 是否将null值作为空字符串写入SLS。 - Boolean - 否 - true - true(默认值):将null值作为空字符串写入日志。
- false:计算结果为null的字段不会写入到日志中。
 说明- 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 
类型映射
| Flink字段类型 | SLS字段类型 | 
| BOOLEAN | STRING | 
| VARBINARY | |
| VARCHAR | |
| TINYINT | |
| INTEGER | |
| BIGINT | |
| FLOAT | |
| DOUBLE | |
| DECIMAL | 
数据摄入
使用限制
仅实时计算引擎VVR 11.1及以上版本支持。
语法结构
source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>配置项
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| type | 数据源类型。 | String | 是 | 无 | 固定值sls。 | 
| endpoint | EndPoint地址。 | String | 是 | 无 | 请填写SLS的私网服务地址,详情请参见服务接入点。 | 
| accessId | 阿里云账号的AccessKey ID。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息?。 重要  为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 | 
| accessKey | 阿里云账号的AccessKey Secret。 | String | 是 | 无 | |
| project | SLS项目名称。 | String | 是 | 无 | 无。 | 
| logStore | SLS LogStore或metricstore名称。 | String | 是 | 无 | logStore和metricstore是相同的消费方式。 | 
| schema.inference.strategy | Schema推导策略。 | String | 否 | continuous | 
 | 
| maxPreFetchLogGroups | Schema初始推导时,对每个shard最多尝试读取解析的logGroup个数。 | Integer | 否 | 50 | 在作业实际读取并处理数据前,对每个shard尝试提前消费指定数量的logGroup,用于初始化schema信息。 | 
| shardDiscoveryIntervalMs | 动态检测shard变化时间间隔,单位为毫秒。 | Long | 否 | 60000 | 设为负值时可以关闭动态检测。 说明  该参数值不能少于1分钟(60000毫秒)。 | 
| startupMode | 启动模式。 | String | 否 | 无 | 
 | 
| startTime | 消费日志的开始时间。 | String | 否 | 当前时间 | 格式为yyyy-MM-dd hh:mm:ss。 仅当startupMode设为timestamp时生效。 说明  startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。 | 
| stopTime | 消费日志的结束时间。 | String | 否 | 无 | 格式为yyyy-MM-dd hh:mm:ss。 说明  如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true。 | 
| consumerGroup | 消费组名称。 | String | 否 | 无 | 消费组用于记录消费进度。您可以自定义消费组名,无固定格式。 | 
| batchGetSize | 单次请求读取logGroup的个数。 | Integer | 否 | 100 | batchGetSize设置不能超过1000,否则会报错。 | 
| maxRetries | 读取SLS失败后重试次数。 | Integer | 否 | 3 | 无。 | 
| exitAfterFinish | 在数据消费完成后,Flink程序是否退出。 | Boolean | 否 | false | 
 | 
| query | SLS消费预处理语句。 | String | 否 | 无 | 通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。 例如 说明  query需使用日志服务SPL语言,请参见SPL语法。 | 
| compressType | SLS压缩类型。 | String | 否 | 无 | 支持的压缩类型包括: 
 | 
| timeZone | startTime 和 stopTime 对应的时区。 | String | 否 | 无 | 默认情况下不添加偏移量。 | 
| regionId | SLS开服地域。 | String | 否 | 无 | 参阅开服地域文档配置。 | 
| signVersion | SLS请求签名版本。 | String | 否 | 无 | 参阅请求签名文档配置。 | 
| shardModDivisor | 在读取SLS LogStore分区时的除数。 | Int | 否 | -1 | 参阅分区(Shard)文档配置。 | 
| shardModRemainder | 在读取SLS LogStore分区时的余数。 | Int | 否 | -1 | 参阅分区(Shard)文档配置。 | 
| metadata.list | 需要传递给下游的元数据列。 | String | 否 | 无 | 可用的元数据字段包括 | 
类型映射
数据摄入类型映射如下表所示:
| SLS字段类型 | CDC字段类型 | 
| STRING | STRING | 
表结构推导和变更同步
- Shard数据预消费和表结构初始化 - SLS连接器会维护当前读取logstore的Schema。在读取logstore中的数据前,SLS连接器会预先在每个shard中尝试消费最多maxPreFetchLogGroups个logGroup的数据,解析其中每条日志的Schema,再将这些Schema合并,用于初始化表结构信息。后续在实际消费数据前会根据初始化的Schema产生对应的建表事件。 说明- 对于每个shard,SLS连接器会尝试从当前时间一小时之前的时间开始消费数据并解析日志Schema。 
- 主键信息 - SLS日志中不包含主键信息,可以通过transform规则手动为表添加主键: - transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2
- Schema推导和Schema变更 - 在表结构初始化完成后,若schema.inference.strategy配置为static,SLS连接器会根据初始的表结构解析每条日志数据,不会产生Schema变更事件。若schema.inference.strategy配置为continuous,SLS连接器会解析每条日志的数据,推导出物理列,并与当前记录的Schema比对,若推导出的Schema与当前Schema不一致时,会将Schema合并,合并规则如下: - 如果推导出的物理列中包含当前Schema中没有的字段,则会将这些字段加入到Schema中,同时产生新增可空列事件。 
- 如果推导出的物理列中不包含当前Schema中已有的字段,该字段仍会保留,该列的数据会填充为NULL,不产生删除列事件。 
 - SLS连接器会将每条日志中的字段类型都推导为String类型,目前仅支持新增列,即会在当前Schema末尾添加新列,新增的列会设置为可空列。 
代码示例
- SQL源表和结果表 - CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;
- 数据摄入数据源 - source: type: sls name: SLS Source endpoint: ${endpoint} project: ${project} logstore: ${logstore} accessId: ${accessId} accessKey: ${accessKey} sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
如果您使用的实时计算引擎VVR版本低于8.0.10,启动作业可能会存在缺少依赖JAR包的问题,可以在附加依赖文件额外引入对应的-uber包解决。
读取SLS
实时计算引擎VVR提供SourceFunction的实现类SlsSourceFunction,用于读取SLS,读取SLS的示例如下。
public class SlsDataStreamSource {
    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }
    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");
        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);
        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);
        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);
        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;
        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }
    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}写入SLS
提供OutputFormat的实现类SLSOutputFormat,用于写入SLS。写入SLS的示例如下。
public class SlsDataStreamSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }
    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }
    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }
}XML
Maven中央库中已经放置了SLS DataStream连接器。
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>常见问题
恢复失败的Flink程序时,TaskManager发生OOM,源表报错java.lang.OutOfMemoryError: Java heap space