Flink Log Connector是日志服务提供的用于对接Flink的工具,支持对接开源Flink和实时计算Flink版。本文介绍如何对接Flink消费日志数据。
前提条件
已开通日志服务。更多信息,请参见开通日志服务。
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
重要阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
已创建Project和Logstore。具体操作,请参见创建项目Project和创建Logstore。
背景信息
Flink Log Connector包括两部分,消费者(Flink Log Consumer)和生产者(Flink Log Producer),两者用途区别如下:
消费者用于从日志服务中读取数据,支持exactly once语义,支持Shard负载均衡。
生产者用于将数据写入日志服务。
使用Flink Log Connector时,需要在项目中添加Maven依赖,示例如下:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.38</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
除此之外的代码编写,请您参考GitHub上源码进行编写。更多信息,请参见aliyun-log-flink-connector。
Flink Log Consumer
在Flink Log Connector中,Flink Log Consumer提供了订阅日志服务中某一个Logstore的能力,实现了exactly once语义,在使用时您无需关心Logstore中Shard数量的变化,Flink Log Consumer会自动感知。
Flink中每一个子任务负责消费Logstore中的部分Shard,如果Logstore中Shard发生分裂或合并,子任务消费的Shard也会随之改变。
Flink Log Consumer用到的日志服务API接口如下:
GetCursorOrData
用于从Shard中获取数据,注意频繁的调用该接口可能会导致数据超过日志服务的Shard限额,可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH控制接口调用的时间间隔和每次调用获取的日志数量。Shard的限额请参见分区(Shard)。
示例如下:
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
ListShards
用于获取Logstore中所有的Shard列表及Shard状态等。如果您的Shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现Shard的变化。示例如下:
// 设置每30s调用一次ListShards接口。 configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
CreateConsumerGroup
当设置消费进度监控时调用该接口创建ConsumerGroup,用于同步Checkpoint。
UpdateCheckPoint
该接口将Flink的snapshot同步到日志服务的ConsumerGroup中。
配置启动参数。
以下是一个简单的消费示例,使用java.util.Properties作为配置工具,所有Flink Log Consumer的配置均在ConfigConstants中。
Properties configProps = new Properties(); // 设置访问日志服务的域名。 configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com"); // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret); // 设置日志服务的project。 String project = "your-project"; // 设置日志服务的Logstore。 String logstore = "your-logstore"; // 设置消费日志服务起始位置。 configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); // 设置日志服务的消息反序列化方法。 FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<FastLogGroupList> dataStream = env.addSource( new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps) ); dataStream.addSink(new SinkFunction<FastLogGroupList>() { @Override public void invoke(FastLogGroupList logGroupList, Context context) throws Exception { for (FastLogGroup logGroup : logGroupList.getLogGroups()) { int logsCount = logGroup.getLogsCount(); String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (int i = 0; i < logsCount; ++i) { FastLog row = logGroup.getLogs(i); for (int j = 0; j < row.getContentsCount(); ++j) { FastLogContent column = row.getContents(j); // 处理日志 System.out.println(column.getKey()); System.out.println(column.getValue()); } } } } }); // 或者使用RawLogGroupListDeserializer RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource( new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps) ); rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() { @Override public void invoke(RawLogGroupList logGroupList, Context context) throws Exception { for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) { String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (RawLog row : logGroup.getLogs()) { // 处理日志 } } } });
说明Flink的子任务数量和日志服务Logstore中的Shard数量是独立的,如果Shard数量多于子任务数量,每个子任务不重复的消费Shard,如果少于子任务数量,那么部分子任务就会空闲,直到新的Shard产生。
设置消费起始位置。
Flink Log Consumer支持设置Shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制从Shard的头、尾或者某个特定时间开始消费。另外,Flink Log Connector也支持从某个具体的消费组中恢复消费。属性的具体取值如下:
Consts.LOG_BEGIN_CURSOR:表示从Shard的头开始消费,也就是从Shard中最旧的数据开始消费。
Consts.LOG_END_CURSOR:表示从Shard的尾部开始,也就是从Shard中最新的数据开始消费。
Consts.LOG_FROM_CHECKPOINT:表示从某个特定的消费组中保存的Checkpoint开始消费,通过ConfigConstants.LOG_CONSUMERGROUP指定具体的消费组。
UnixTimestamp:一个整型数值的字符串,用1970-01-01到现在的秒数表示,含义是消费Shard中这个时间点之后的数据。
示例如下:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
说明如果在启动Flink任务时,设置了从Flink自身的StateBackend中恢复,那么Flink Log Connector会忽略上面的配置,使用StateBackend中保存的Checkpoint。
可选:设置消费进度监控。
Flink Log Consumer支持设置消费进度监控,获取每一个Shard的实时消费位置,使用时间戳表示。更多信息,请参见步骤二:查看消费组状态和消费组监控与告警。
示例如下:
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
说明该项为可选配置项,设置后Flink Log Consumer会首先创建消费组,如果消费组已经存在,则不执行任何操作,Flink Log Consumer中的snapshot会自动同步到日志服务的消费组中,您可以通过日志服务的控制台查看Flink Log Consumer的消费进度。
设置容灾和exactly once语义支持。
当打开Flink的Checkpointing功能时,Flink Log Consumer会周期性地将每个Shard的消费进度保存,当任务失败时,Flink会恢复消费任务,并从保存的最新的Checkpoint开始消费。
Checkpoint的周期定义了当任务失败时,最多多少的数据会被回溯,即重新消费,使用代码如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启Flink exactly once语义。 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 每5s保存一次Checkpoint。 env.enableCheckpointing(5000);
更多Flink Checkpoint的信息请参见Checkpoints。
Flink Log Producer
Flink Log Producer用于将数据写入日志服务。
Flink Log Producer只支持Flink at least once语义,在任务失败时,写入日志服务中的数据有可能会重复,但不会丢失。
Flink Log Producer用到的日志服务API接口如下:
PutLogs
ListShards
初始化Flink Log Producer。
初始化配置参数Properties。
Flink Log Producer初始化步骤与Flink Log Consumer类似。Flink Log Producer初始化配置包含以下参数,一般情况下使用默认值即可,如有需要可以自定义配置。示例如下:
// 用于发送数据的I/O线程的数量,默认为核心数。 ConfigConstants.IO_THREAD_NUM // 日志发送前被缓存的最大允许时间,默认为2000毫秒。 ConfigConstants.FLUSH_INTERVAL_MS // 任务可以使用的内存总的大小,默认为100 MB。 ConfigConstants.TOTAL_SIZE_IN_BYTES // 内存达到上限时,发送日志的最大阻塞时间,单位为毫秒,默认为 60s。 ConfigConstants.MAX_BLOCK_TIME_MS // 最大重试次数,默认为 10 次。 ConfigConstants.MAX_RETRIES
重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。
RawLogGroup是日志的集合,各字段含义请参见日志(Log)。
如果您需要指定数据写到某一个Shard中,可以使用LogPartitioner产生数据的HashKey,LogPartitioner为可选项,如果您没有配置,数据会随机写入某一个Shard。
示例如下:
FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { // 生成32位Hash值。 public String getHashKey(String element) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); String hash = new BigInteger(1, md.digest()).toString(16); while(hash.length() < 32) hash = "0" + hash; return hash; } catch (NoSuchAlgorithmException e) { } return "0000000000000000000000000000000000000000000000000000000000000000"; } });
将模拟产生的字符串写入日志服务,示例如下:
// 将数据序列化成日志服务的数据格式。 class SimpleLogSerializer implements LogSerializationSchema<String> { public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; //本示例从环境变量中获取AccessKey ID和AccessKey Secret。 public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // 设置访问日志服务的域名。 configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); // 设置用户AK。 configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); // 设置日志写入的日志服务project。 configProps.put(ConfigConstants.LOG_PROJECT, sProject); // 设置日志写入的日志服务Logstore。 configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } // 模拟产生日志。 public static class EventsGenerator implements SourceFunction<String> { private boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
消费示例
本示例中,Flink Log Consumer将读取到的数据以FastLogGroupList形式存储到数据流中,接着使用flatMap函数将FastLogGroupList转换为JSON字符串并输出到命令行或写入文本文件。
package com.aliyun.openservices.log.flink.sample;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkConsumerSample {
private static final String SLS_ENDPOINT = "your-endpoint";
//本示例从环境变量中获取AccessKey ID和AccessKey Secret。
private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static final String SLS_PROJECT = "your-project";
private static final String SLS_LOGSTORE = "your-logstore";
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Configuration conf = new Configuration();
// Checkpoint dir like "file:///tmp/flink"
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
Properties configProps = new Properties();
configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");
FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
DataStream<FastLogGroupList> stream = env.addSource(
new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));
stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
for (FastLogGroup logGroup : value.getLogGroups()) {
int logCount = logGroup.getLogsCount();
for (int i = 0; i < logCount; i++) {
FastLog log = logGroup.getLogs(i);
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", logGroup.getTopic());
jsonObject.put("source", logGroup.getSource());
for (int j = 0; j < log.getContentsCount(); j++) {
jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
}
out.collect(jsonObject.toJSONString());
}
}
}).returns(String.class);
stream.writeAsText("log-" + System.nanoTime());
env.execute("Flink consumer");
}
}