Flink消费

更新时间:
复制 MD 格式

Flink Log Connector 是日志服务提供的 Flink 对接工具,支持开源 Flink 和实时计算 Flink 版。本文介绍如何通过 Flink Log Connector 消费和写入日志数据。

前提条件

您已完成以下操作:

背景信息

Flink Log Connector 包含消费者(Consumer)和生产者(Producer)两部分:

  • 消费者(Consumer):从日志服务读取数据,支持 exactly once 语义和 Shard 负载均衡。

  • 生产者(Producer):将数据写入日志服务。

使用 Flink Log Connector 时,需在项目中添加以下 Maven 依赖:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>flink-log-connector</artifactId>
    <version>0.1.46</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

更多代码示例,请参见aliyun-log-flink-connector

说明

Flink Log Connector 0.1.46 版本引入了基于 FLIP-27(Flink 改进提案 27)规范实现的新版接口 AliyunLogSourceAliyunLogSink,同时提供 SQL Connector。新作业建议使用新版接口。旧接口 FlinkLogConsumerFlinkLogProducer 后续计划删除。

AliyunLogSource(DataStream Source)

AliyunLogSource 基于 FLIP-27 规范实现,通过 env.fromSource(...) 接入。Source 的 split/cursor 状态参与 Flink checkpoint,用于作业 failover 恢复。如果设置了 ConsumerGroup,还可以将 checkpoint 提交到日志服务服务端,便于监控消费进度。

基本用法:

Properties properties = new Properties();
properties.setProperty(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
properties.setProperty(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");

AliyunLogSource<MyRecord> source = AliyunLogSource.<MyRecord>builder()
        .setProject("your-project")
        .setLogStore("your-logstore")
        .setEndpoint("cn-hangzhou.log.aliyuncs.com")
        .setCredentials(accessKeyId, accessKeySecret)
        .setConsumerGroup("flink-source-consumer")
        .setStartingPosition(StartingPosition.EARLIEST)
        .setProperties(properties)
        .setDeserializer(new MyDeserializer())
        .build();

DataStream<MyRecord> stream = env.fromSource(
        source,
        WatermarkStrategy.noWatermarks(),
        "aliyun-log-source");

Source 参数

参数 / Builder 方法

是否必填

默认值

含义

setProject(String project)

要消费的日志服务 Project。

setLogStore(String logstore)

要消费的 Logstore。

setEndpoint(String endpoint)

日志服务 Endpoint,例如 cn-hangzhou.log.aliyuncs.com

setCredentials(String accessKeyId, String accessKey)

访问日志服务的 AccessKey ID 和 AccessKey Secret。

setDeserializer(AliyunLogDeserializationSchema<T> deserializer)

将 SLS 拉取结果转换为 Flink 记录的反序列化器。

setConsumerGroup(String consumerGroup)

日志服务 ConsumerGroup 名称,用于读取或提交服务端 checkpoint。

setStartingPosition(StartingPosition)

earliest

消费起始位置。支持 earliestlatestcheckpoint 或 Unix 秒级时间戳。

setFallbackPosition(StartingPosition)

earliest

起始位置为 checkpoint 且服务端没有 checkpoint 时使用的兜底位置。

ConfigConstants.LOG_MAX_NUMBER_PER_FETCH

100

单次从单个 Shard 拉取的最大 LogGroup 数量。

ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS

100

一次拉取没有返回数据时,下次拉取前的等待间隔(单位:毫秒)。

ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS

60000

发现 Shard 分裂或合并的轮询周期(单位:毫秒)。

ConfigConstants.LOG_CHECKPOINT_MODE

ON_CHECKPOINTS

服务端 checkpoint 提交模式。ON_CHECKPOINTS:Flink checkpoint 完成时提交。PERIODIC:独立定时提交。DISABLED:不提交到服务端。

ConfigConstants.STOP_TIME

停止消费的 Unix 秒级时间戳,读取到该时间点后停止消费对应 Shard,适合离线补数据场景。

ConfigConstants.MAX_RETRIES

5

普通错误的最大重试次数。

ConfigConstants.SIGNATURE_VERSION

v1

请求签名版本,支持 v1v4。使用 v4 时需同时设置 REGION_ID

setSplitAssigner(AliyunLogSplitAssigner)

ModuloSplitAssigner

Shard 到 source reader 的分配策略。默认按 shard id 和并行度取模,也可以使用 RoundRobinSplitAssigner 或自定义实现。

ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS

10000

PERIODIC 模式下提交服务端 checkpoint 的间隔(单位:毫秒)。

ConfigConstants.SOURCE_MEMORY_LIMIT

0

拉取结果的内存限制(单位:字节),0 表示不启用。

ConfigConstants.LOG_USER_AGENT

connector 默认 UA

自定义 User-Agent。

ConfigConstants.REGION_ID

使用 v4 时必填

V4 签名使用的地域 ID,例如 cn-hangzhou

ConfigConstants.MAX_RETRIES_FOR_RETRYABLE_ERROR

60

可重试错误的最大重试次数。

ConfigConstants.BASE_RETRY_BACK_OFF_TIME_MS

200

初始重试退避时间(单位:毫秒)。

ConfigConstants.MAX_RETRY_BACK_OFF_TIME_MS

5000

最大重试退避时间(单位:毫秒)。

ConfigConstants.PROXY_HOST

HTTP 代理地址。

ConfigConstants.PROXY_PORT

-1

HTTP 代理端口。

ConfigConstants.PROXY_USERNAME

HTTP 代理用户名。

自定义 Deserializer

实现 AliyunLogDeserializationSchema<T> 接口,在 deserialize 方法中完成日志展开和字段转换。一个 PullLogsResult 可能包含多个 LogGroup,每个 LogGroup 可能包含多条日志,反序列化器可以向 Collector 输出零条、一条或多条 Flink 记录。

以下示例将每条 SLS 日志展开为包含元数据和 content map 的 POJO:

public class ContentMapDeserializer implements AliyunLogDeserializationSchema<SlsLogRecord> {
    @Override
    public TypeInformation<SlsLogRecord> getProducedType() {
        return TypeInformation.of(SlsLogRecord.class);
    }

    @Override
    public void deserialize(PullLogsResult record, Collector<SlsLogRecord> out) {
        for (LogGroupData logGroupData : record.getLogGroupList()) {
            FastLogGroup logGroup = logGroupData.GetFastLogGroup();
            for (int logIndex = 0; logIndex < logGroup.getLogsCount(); logIndex++) {
                FastLog log = logGroup.getLogs(logIndex);
                Map<String, String> fields = new LinkedHashMap<>();
                for (int contentIndex = 0; contentIndex < log.getContentsCount(); contentIndex++) {
                    FastLogContent content = log.getContents(contentIndex);
                    fields.put(content.getKey(), content.getValue());
                }
                out.collect(new SlsLogRecord(
                        log.getTime(),
                        logGroup.getTopic(),
                        logGroup.getSource(),
                        record.getShard(),
                        record.getCursor(),
                        fields));
            }
        }
    }
}

完整消费示例

以下示例从环境变量获取访问凭证,从 ConsumerGroup checkpoint 继续消费。服务端无 checkpoint 时,从最早位置开始。

package com.aliyun.openservices.log.flink.sample;

import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.source.AliyunLogSource;
import com.aliyun.openservices.log.flink.source.StartingPosition;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class AliyunLogConsumerSample {
    private static final String SLS_ENDPOINT = "cn-hangzhou.log.aliyuncs.com";
    private static final String SLS_PROJECT = "your-project";
    private static final String SLS_LOGSTORE = "your-logstore";
    private static final String CONSUMER_GROUP = "your-consumer-group";

    public static void main(String[] args) throws Exception {
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///tmp/flink-checkpoints");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(2);
        env.enableCheckpointing(60000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        Properties sourceProperties = new Properties();
        sourceProperties.setProperty(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
        sourceProperties.setProperty(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
        sourceProperties.setProperty(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "60000");
        sourceProperties.setProperty(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());

        AliyunLogSource<SlsLogRecord> source = AliyunLogSource.<SlsLogRecord>builder()
                .setEndpoint(SLS_ENDPOINT)
                .setProject(SLS_PROJECT)
                .setLogStore(SLS_LOGSTORE)
                .setCredentials(accessKeyId, accessKeySecret)
                .setConsumerGroup(CONSUMER_GROUP)
                .setStartingPosition(StartingPosition.CHECKPOINT)
                .setFallbackPosition(StartingPosition.EARLIEST)
                .setProperties(sourceProperties)
                .setDeserializer(new ContentMapDeserializer())
                .build();

        DataStream<SlsLogRecord> stream = env.fromSource(
                source,
                WatermarkStrategy.noWatermarks(),
                "aliyun-log-source");

        stream.print();
        env.execute("aliyun log consumer");
    }
}

AliyunLogSink(DataStream Sink)

AliyunLogSink 通过 stream.sinkTo(...) 接入。Sink 基于日志服务 Producer SDK 异步发送数据,并在 Flink checkpoint 或作业结束时等待已提交请求完成,提供 at-least-once 语义。

自定义序列化器需实现 AliyunLogSerializationSchema<T> 接口。每个输入元素可通过 Collector<SinkRecord> 输出零条、一条或多条 SLS 记录。

基本用法:

class MySerializationSchema implements AliyunLogSerializationSchema<String> {
    @Override
    public void serialize(String element, Collector<SinkRecord> output) {
        LogItem item = new LogItem((int) (System.currentTimeMillis() / 1000L));
        item.PushBack("message", element);

        SinkRecord record = new SinkRecord();
        record.setTopic("flink");
        record.setSource("flink-job");
        record.setLogItem(item);
        output.collect(record);
    }
}

AliyunLogSink<String> sink = AliyunLogSink.<String>builder()
        .setProject("your-project")
        .setLogStore("your-logstore")
        .setEndpoint("cn-hangzhou.log.aliyuncs.com")
        .setCredentials(accessKeyId, accessKeySecret)
        .setSerializer(new MySerializationSchema())
        .setProperty(ConfigConstants.FLUSH_INTERVAL_MS, "100")
        .build();

stream.sinkTo(sink).name("aliyun-log-sink");

Sink 参数

参数 / Builder 方法

是否必填

默认值

含义

setProject(String project)

写入目标 Project。

setLogStore(String logstore)

默认写入目标 Logstore。单条 SinkRecord 设置了 logstore 时会覆盖该值。

setEndpoint(String endpoint)

日志服务 Endpoint。

setCredentials(String accessKeyId, String accessKey)

访问日志服务的 AccessKey ID 和 AccessKey Secret。

setSerializer(AliyunLogSerializationSchema<T> serializer)

将 Flink 记录转换为 SinkRecord 的序列化器。

ConfigConstants.FLUSH_INTERVAL_MS

Producer SDK 默认值

日志在客户端缓存后等待发送的最长时间(单位:毫秒)。

ConfigConstants.MAX_RETRIES

Producer SDK 默认值

普通发送失败的最大重试次数。

ConfigConstants.IO_THREAD_NUM

Producer SDK 默认值

发送日志的 IO 线程数量。

ConfigConstants.TOTAL_SIZE_IN_BYTES

Producer SDK 默认值

Producer 客户端可使用的总缓存大小。

ConfigConstants.MAX_BLOCK_TIME_MS

Producer SDK 默认值

缓存满或资源不足时,发送调用最多阻塞等待的时间(单位:毫秒)。

ConfigConstants.SIGNATURE_VERSION

v1

请求签名版本,支持 v1v4。使用 v4 时需同时设置 REGION_ID

ConfigConstants.BUCKETS

Producer SDK 默认值

Producer 内部分桶数量,用于并发和批量聚合。

ConfigConstants.PRODUCER_ADJUST_SHARD_HASH

true

是否由 Producer 自动调整 shard hash。

ConfigConstants.REGION_ID

使用 v4 时必填

V4 签名使用的地域 ID。

如果需要控制写入 Shard,可以在 SinkRecord 上调用 record.setHashKey(...) 设置 hash key。如果需要动态写入不同 Logstore,可以调用 record.setLogstore(...) 覆盖 Sink 默认 Logstore。

完整写入示例

以下示例使用 env.fromSequence(...) 生成测试数据,并通过 AliyunLogSerializationSchema 写入日志服务。

package com.aliyun.openservices.log.flink.sample;

import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.data.SinkRecord;
import com.aliyun.openservices.log.flink.model.AliyunLogSerializationSchema;
import com.aliyun.openservices.log.flink.sink.AliyunLogSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class AliyunLogProducerSample {
    private static final String SLS_ENDPOINT = "cn-hangzhou.log.aliyuncs.com";
    private static final String SLS_PROJECT = "your-project";
    private static final String SLS_LOGSTORE = "your-logstore";

    public static void main(String[] args) throws Exception {
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        DataStream<Long> events = env.fromSequence(1, 1000);

        AliyunLogSink<Long> sink = AliyunLogSink.<Long>builder()
                .setEndpoint(SLS_ENDPOINT)
                .setProject(SLS_PROJECT)
                .setLogStore(SLS_LOGSTORE)
                .setCredentials(accessKeyId, accessKeySecret)
                .setSerializer(new LongSerializer())
                .setProperty(ConfigConstants.FLUSH_INTERVAL_MS, "100")
                .setProperty(ConfigConstants.MAX_RETRIES, "10")
                .build();

        events.sinkTo(sink).name("aliyun-log-sink");
        env.execute("aliyun log producer");
    }

    public static class LongSerializer implements AliyunLogSerializationSchema<Long> {
        @Override
        public void serialize(Long element, Collector<SinkRecord> output) {
            LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000L));
            logItem.PushBack("id", String.valueOf(element));
            logItem.PushBack("message", "message-" + element);

            SinkRecord record = new SinkRecord();
            record.setTopic("flink");
            record.setSource("flink-job");
            record.setLogItem(logItem);
            output.collect(record);
        }
    }
}

SQL Connector

SQL Connector 标识为 aliyun-log,同一 connector 同时支持 SQL Source 和 SQL Sink。Source 普通列按同名 SLS log content 读取,Sink 普通列按列名写入 SLS log content。

SQL Source 示例

CREATE TABLE sls_logs (
  `__time__` TIMESTAMP(3),
  `__topic__` STRING,
  `__source__` STRING,
  level STRING,
  message STRING,
  status_code INT
) WITH (
  'connector' = 'aliyun-log',
  'endpoint' = 'cn-hangzhou.log.aliyuncs.com',
  'project' = 'your-project',
  'logstore' = 'your-logstore',
  'access.key.id' = '${ACCESS_KEY_ID}',
  'access.key.secret' = '${ACCESS_KEY_SECRET}',
  'consumer-group' = 'flink-sql-consumer',
  'scan.startup.mode' = 'checkpoint',
  'scan.startup.default-position' = 'earliest',
  'checkpoint.mode' = 'on-checkpoints',
  'max.number.per.fetch' = '100',
  'shards.discovery.interval.ms' = '60000',
  'ignore-parse-errors' = 'true'
);

SQL Sink 示例

CREATE TABLE sls_sink (
  `__time__` TIMESTAMP(3),
  `__topic__` STRING,
  `__source__` STRING,
  level STRING,
  message STRING,
  status_code INT
) WITH (
  'connector' = 'aliyun-log',
  'endpoint' = 'cn-hangzhou.log.aliyuncs.com',
  'project' = 'your-project',
  'logstore' = 'your-logstore',
  'access.key.id' = '${ACCESS_KEY_ID}',
  'access.key.secret' = '${ACCESS_KEY_SECRET}',
  'sink.topic' = 'flink-sql',
  'sink.source' = 'flink-job',
  'flush.interval.ms' = '100',
  'max.retries' = '5'
);

SQL WITH 参数

SQL 参数

适用方向

是否必填

默认值

含义

connector

Source / Sink

固定为 aliyun-log

endpoint

Source / Sink

日志服务 Endpoint。

project

Source / Sink

日志服务 Project。

logstore

Source / Sink

Source 读取或 Sink 默认写入的 Logstore。

access.key.id

Source / Sink

访问日志服务的 AccessKey ID。

access.key.secret

Source / Sink

访问日志服务的 AccessKey Secret。

consumer-group

Source

ConsumerGroup 名称,用于读取或提交服务端 checkpoint。

scan.startup.mode

Source

earliest

消费起始位置。支持 earliestlatestcheckpoint 或 Unix 秒级时间戳。

checkpoint.mode

Source

on-checkpoints

服务端 checkpoint 提交模式。支持 on-checkpointsperiodicdisabled

max.number.per.fetch

Source

100

单次从单个 Shard 拉取的最大 LogGroup 数量。

ignore-parse-errors

Source

false

字段类型转换失败时是否输出 NULL。false 表示抛出异常。

sink.topic

Sink

""

写入时默认使用的 LogGroup topic,可被 __topic__ 列覆盖。

sink.source

Sink

写入时默认使用的 LogGroup source,可被 __source__ 列覆盖。

flush.interval.ms

Sink

Producer SDK 默认值

日志在客户端缓存后等待发送的最长时间。

signature.version

Source / Sink

v1

请求签名版本,支持 v1v4

SQL Source 元数据列

以下列为内置读取元数据列,声明后从 SLS log 或 Shard 元信息中读取,不从 log content 中取同名字段:

元数据列

推荐类型

含义

__time__

TIMESTAMP(3)

SLS log 时间。

__topic__

STRING

LogGroup topic。

__source__

STRING

LogGroup source。

__shard__

INT

当前记录所在 Shard ID。

__cursor__

STRING

当前拉取批次对应的 cursor。

SQL Sink 元数据列

以下列为内置写入元数据列,声明后不作为普通 content 写入:

元数据列

推荐类型

含义

__time__

TIMESTAMP(3)

写入 SLS log time,时间戳类型按秒写入。

__topic__

STRING

覆盖 sink.topic,设置当前记录的 topic。

__source__

STRING

覆盖 sink.source,设置当前记录的 source。

__logstore__

STRING

覆盖表参数 logstore,将当前记录写入指定 Logstore。

__hash_key__

STRING

设置当前记录的 Shard hash key。

RAM 权限

使用 Flink Log Connector 访问日志服务时,需为 RAM 用户或角色授予以下 API 权限。

Source 读取所需权限

API

Resource

log:GetCursorOrData

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

log:ListShards

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

log:CreateConsumerGroup

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*

log:ConsumerGroupUpdateCheckPoint

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

Sink 写入所需权限

API

Resource

log:PostLogStoreLogs

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

Flink Log Consumer

重要

该接口后续计划删除。新作业请使用 AliyunLogSource

Flink Log Consumer 用于订阅 Logstore 中的日志数据,支持 exactly once 语义。Flink Log Consumer 自动感知 Shard 分裂和合并,无需手动处理 Shard 变化。

Flink 的每个子任务负责消费 Logstore 中的部分 Shard。当 Shard 发生分裂或合并时,子任务消费的 Shard 会自动调整。

Flink Log Consumer 使用的日志服务 API 接口:

  • GetCursorOrData

    用于从Shard中获取数据,注意频繁的调用该接口可能会导致数据超过日志服务的Shard限额,可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLISConfigConstants.LOG_MAX_NUMBER_PER_FETCH控制接口调用的时间间隔和每次调用获取的日志数量。Shard的限额请参见分区(Shard)

    示例如下:

    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards

    获取 Logstore 中所有 Shard 列表及状态。Shard 经常分裂合并时,可缩短轮询周期及时发现变化。示例:

    // 设置每60s调用一次ListShards接口。
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "60000");
  • CreateConsumerGroup

    设置消费进度监控时调用,创建 ConsumerGroup 用于同步 Checkpoint。

  • UpdateCheckPoint

    将 Flink 的 snapshot 同步到日志服务 ConsumerGroup 中。

  1. 设置启动参数。

    以下是一个简单的消费示例,使用java.util.Properties作为配置工具,所有Flink Log Consumer的配置均在ConfigConstants中。

    Properties configProps = new Properties();
    // 设置访问日志服务的域名。
    configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
    // 本示例从环境变量中获取AccessKey IDAccessKey Secret。
    String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId);
    configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret);
    // 设置日志服务的project。
    String project = "your-project";
    // 设置日志服务的LogStore。
    String logstore = "your-logstore";
    // 设置消费日志服务起始位置。
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    // 设置日志服务的消息反序列化方法。
    FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<FastLogGroupList> dataStream = env.addSource(
            new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps)
    );
    dataStream.addSink(new SinkFunction<FastLogGroupList>() {
        @Override
        public void invoke(FastLogGroupList logGroupList, Context context) throws Exception {
            for (FastLogGroup logGroup : logGroupList.getLogGroups()) {
                int logsCount = logGroup.getLogsCount();
                String topic = logGroup.getTopic();
                String source = logGroup.getSource();
                for (int i = 0; i < logsCount; ++i) {
                    FastLog row = logGroup.getLogs(i);
                    for (int j = 0; j < row.getContentsCount(); ++j) {
                        FastLogContent column = row.getContents(j);
                        // 处理日志
                        System.out.println(column.getKey());
                        System.out.println(column.getValue());
                    }
                }
            }
        }
    });
    // 或者使用RawLogGroupListDeserializer
    RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource(
            new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps)
    );
    rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() {
        @Override
        public void invoke(RawLogGroupList logGroupList, Context context) throws Exception {
            for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) {
                String topic = logGroup.getTopic();
                String source = logGroup.getSource();
                for (RawLog row : logGroup.getLogs()) {
                    // 处理日志
                }
            }
        }
    });
    说明

    Flink 子任务数量和 Logstore 的 Shard 数量相互独立。Shard 多于子任务时,每个子任务不重复地消费 Shard;Shard 少于子任务时,部分子任务空闲,直到新 Shard 产生。

  2. 设置消费起始位置。

    Flink Log Consumer支持设置Shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制从Shard的头、尾或者某个特定时间开始消费。另外,Flink Log Connector也支持从某个具体的消费组中恢复消费。属性的具体取值如下:

    • Consts.LOG_BEGIN_CURSOR:从 Shard 头部(最旧数据)开始消费。

    • Consts.LOG_END_CURSOR:从 Shard 尾部(最新数据)开始消费。

    • Consts.LOG_FROM_CHECKPOINT:表示从某个特定的消费组中保存的Checkpoint开始消费,通过ConfigConstants.LOG_CONSUMERGROUP指定具体的消费组。

    • UnixTimestamp:Unix 秒级时间戳(整型数值字符串),从该时间点之后开始消费。

    示例如下:

    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
    说明

    从 Flink StateBackend 恢复时,Flink Log Connector 忽略上述设置,使用 StateBackend 中保存的 Checkpoint。

  3. 可选:设置消费进度监控。

    Flink Log Consumer 支持消费进度监控,获取每个 Shard 的实时消费位置。更多信息,请参见步骤二:查看消费组状态消费组监控与告警

    示例如下:

    configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
    说明

    设置后,Flink Log Consumer 自动创建消费组(已存在则跳过),并将 snapshot 同步到日志服务消费组中。可通过日志服务控制台查看消费进度。

  4. 设置容灾和exactly once语义支持。

    开启 Flink Checkpointing 后,Flink Log Consumer 周期性保存每个 Shard 的消费进度。任务失败时,从最新 Checkpoint 恢复消费。

    Checkpoint 周期决定任务失败时最多回溯的数据量。配置示例:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 开启Flink exactly once语义。
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 每5s保存一次Checkpoint。
    env.enableCheckpointing(5000);

Flink Log Producer

重要

该接口后续计划删除。新作业请使用 AliyunLogSink

Flink Log Producer 将数据写入日志服务。

说明

Flink Log Producer 仅支持 at least once 语义,任务失败时写入的数据可能重复但不会丢失。

Flink Log Producer 使用的日志服务 API 接口:

  • PutLogs

  • ListShards

  1. 初始化Flink Log Producer。

    1. 初始化 Properties 设置参数。

      Flink Log Producer 初始化方式与 Consumer 类似,以下参数使用默认值即可,按需自定义:

      // 用于发送数据的I/O线程的数量,默认为核心数。
      ConfigConstants.IO_THREAD_NUM
      // 日志发送前被缓存的最大允许时间,默认为2000毫秒。
      ConfigConstants.FLUSH_INTERVAL_MS
      // 任务可以使用的内存总的大小,默认为100 MB。
      ConfigConstants.TOTAL_SIZE_IN_BYTES
      // 内存达到上限时,发送日志的最大阻塞时间,单位为毫秒,默认为 60s。
      ConfigConstants.MAX_BLOCK_TIME_MS
      // 最大重试次数,默认为 10 次。
      ConfigConstants.MAX_RETRIES
    2. 重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

      RawLogGroup 是日志集合,字段含义请参见日志(Log)

      如需将数据写入指定 Shard,使用 LogPartitioner 生成 HashKey。未设置 LogPartitioner 时,数据随机写入 Shard。

      示例如下:

      FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // 生成32Hash值。
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });
  2. 将模拟产生的字符串写入日志服务,示例如下:

    // 将数据序列化成日志服务的数据格式。
    class SimpleLogSerializer implements LogSerializationSchema<String> {
        public RawLogGroup serialize(String element) {
            RawLogGroup rlg = new RawLogGroup();
            RawLog rl = new RawLog();
            rl.setTime((int)(System.currentTimeMillis() / 1000));
            rl.addContent("message", element);
            rlg.addLog(rl);
            return rlg;
        }
    }
    public class ProducerSample {
        public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        //本示例从环境变量中获取AccessKey IDAccessKey Secret。
        public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static String sProject = "ali-cn-hangzhou-sls-admin";
        public static String sLogstore = "test-flink-producer";
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
            env.setParallelism(3);
            DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
            Properties configProps = new Properties();
            // 设置访问日志服务的域名。
            configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
            // 设置用户AK。
            configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId);
            configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
            // 设置日志写入的日志服务project。
            configProps.put(ConfigConstants.LOG_PROJECT, sProject);
            // 设置日志写入的日志服务Logstore。
            configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
            FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
            simpleStringStream.addSink(logProducer);
            env.execute("flink log producer");
        }
        // 模拟产生日志。
        public static class EventsGenerator implements SourceFunction<String> {
            private boolean running = true;
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                long seq = 0;
                while (running) {
                    Thread.sleep(10);
                    ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }

消费示例

以下示例使用 Flink Log Consumer 读取数据,通过 flatMap 将 FastLogGroupList 转换为 JSON 字符串,输出到命令行或写入文本文件。

package com.aliyun.openservices.log.flink.sample;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkConsumerSample {
    private static final String SLS_ENDPOINT = "your-endpoint";
    //本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static final String SLS_PROJECT = "your-project";
    private static final String SLS_LOGSTORE = "your-logstore";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        Configuration conf = new Configuration();
        // Checkpoint dir like "file:///tmp/flink"
        conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
        configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
        configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
        configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
        configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");

        FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
        DataStream<FastLogGroupList> stream = env.addSource(
                new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));

        stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
            for (FastLogGroup logGroup : value.getLogGroups()) {
                int logCount = logGroup.getLogsCount();
                for (int i = 0; i < logCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("topic", logGroup.getTopic());
                    jsonObject.put("source", logGroup.getSource());
                    for (int j = 0; j < log.getContentsCount(); j++) {
                        jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
                    }
                    out.collect(jsonObject.toJSONString());
                }
            }
        }).returns(String.class);

        stream.writeAsText("log-" + System.nanoTime());
        env.execute("Flink consumer");
    }
}