全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
日志服务

Flink 消费

更新时间:2018-03-29 22:57:11

Flink log connector是阿里云日志服务提供的,用于对接Flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持Shard负载均衡。

生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-java_2.11</artifactId>
  4. <version>1.3.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.openservices</groupId>
  8. <artifactId>flink-log-connector</artifactId>
  9. <version>0.1.7</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.google.protobuf</groupId>
  13. <artifactId>protobuf-java</artifactId>
  14. <version>2.5.0</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.aliyun.openservices</groupId>
  18. <artifactId>aliyun-log</artifactId>
  19. <version>0.6.10</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.aliyun.openservices</groupId>
  23. <artifactId>log-loghub-producer</artifactId>
  24. <version>0.1.8</version>
  25. </dependency>

前提条件

  1. 已启用Access Key,并创建了Project和Logstore。详细步骤请参考准备流程
  2. 若您选择使用子账号操作日志服务,请确认已正确设置了Logstore的RAM授权策略。详细内容请参考授权RAM子用户访问日志服务资源

Log Consumer

在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个Logstore的能力,实现了exactly once语义,在使用时,用户无需关心Logstore中shard数量的变化,consumer会自动感知。

Flink中每一个子任务负责消费Logstore中部分shard,如果Logstore中shard发生split或者merge,子任务消费的shard也会随之改变。

关联 API

Flink log consumer 会用到的阿里云日志服务接口如下:

  • GetCursorOrData

    用于从shard中拉数据, 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota, 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH 控制接口调用的时间间隔和每次调用拉取的日志数量,shard的quota参考文章shard简介.

    1. configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS "100");
    2. configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH "100");
  • ListShards

    用于获取Logstore中所有的shard列表,获取shard状态等.如果您的shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现shard的变化。

    1. // 设置每30s调用一次ListShards
    2. configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS "30000");
  • CreateConsumerGroup

    该接口调用只有当设置消费进度监控时才会发生,功能是创建consumerGroup,用于同步checkpoint。

  • ConsumerGroupUpdateCheckPoint

    该接口用户将flink的snapshot同步到日志服务的consumerGroup中。

子用户权限

子用户使用Flink log consumer需要授权如下几个RAM Policy:

接口 资源
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

配置步骤

1 配置启动参数

  1. Properties configProps = new Properties();
  2. // 设置访问日志服务的域名
  3. configProps.put(ConfigConstants.LOG_ENDPOINT "cn-hangzhou.log.aliyuncs.com");
  4. // 设置访问ak
  5. configProps.put(ConfigConstants.LOG_ACCESSSKEYID "");
  6. configProps.put(ConfigConstants.LOG_ACCESSKEY "");
  7. // 设置日志服务的project
  8. configProps.put(ConfigConstants.LOG_PROJECT "ali-cn-hangzhou-sls-admin");
  9. // 设置日志服务的Logstore
  10. configProps.put(ConfigConstants.LOG_LOGSTORE "sls_consumergroup_log");
  11. // 设置消费日志服务起始位置
  12. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION Consts.LOG_END_CURSOR);
  13. // 设置日志服务的消息反序列化方法
  14. RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
  15. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. DataStream<RawLogGroupList> logTestStream = env.addSource(
  17. new FlinkLogConsumer<RawLogGroupList>(deserializer configProps));

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,Flink stream的子任务数量和日志服务Logstore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于子任务数量,那么部分子任务就会空闲,等到新的shard产生。

2 设置消费起始位置

Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,另外,connector也支持从某个具体的ConsumerGroup中恢复消费。具体取值如下:

  • Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。
  • Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。
  • Consts.LOG_FROM_CHECKPOINT:表示从某个特定的ConsumerGroup中保存的Checkpoint开始消费,通过ConfigConstants.LOG_CONSUMERGROUP指定具体的ConsumerGroup。
  • UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。

四种取值举例如下:

  1. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
  2. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
  3. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
  4. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);

注意:如果在启动Flink任务时,设置了从Flink自身的StateBackend中恢复,那么connector会忽略上面的配置,使用StateBackend中保存的Checkpoint。

3 设置消费进度监控(可选)

Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考文档消费组-查看状态消费组-监控报警

  1. configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意:以上配置为可选项,如果设置,consumer会首先创建consumerGroup。如果consumerGroup已经存在,则不执行任何操作,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

4 设置容灾和exactly once语义支持

当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,即重新消费,使用代码如下:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // 开启flink exactly once语义
  3. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  4. // 每5s保存一次checkpoint
  5. env.enableCheckpointing(5000);

更多Flink checkpoint的细节请参考Flink官方文档Checkpoints

Log Producer

FlinkLogProducer 用于将数据写到阿里云日志服务中。注意:Producer只支持Flink at-least-once语义,在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

子用户权限

Producer依赖日志服务的API写数据,例如:

  • log:PostLogStoreLogs
  • log:ListShards

当RAM子用户使用Producer时,需要对以上两个API进行授权:

接口 资源
log:PostLogStoreLogs acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

配置步骤

  1. 初始化Producer。

    1. 初始化配置参数Properties。

      Producer初始化步骤与Consumer类似。 Producer包含以下参数,一般情况下使用默认值即可,如有需要,可以自定义配置。

      1. // 用于发送数据的io线程的数量,默认是8
      2. ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
      3. // 该值定义日志数据被缓存发送的时间,默认是3000
      4. ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
      5. // 缓存发送的包中日志的数量,默认是4096
      6. ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
      7. // 缓存发送的包的大小,默认是3Mb
      8. ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
      9. // 作业可以使用的内存总的大小,默认是100Mb
      10. ConfigConstants.LOG_MEM_POOL_BYTES

      上述参数不是必选参数,用户可以不设置,直接使用默认值。

    2. 重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

      RawLogGroup是log的集合,每个字段的含义可以参考文档日志数据模型

      如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey。

      示例:

      1. FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      2. logProducer.setCustomPartitioner(new LogPartitioner<String>() {
      3. // 生成32位hash值
      4. public String getHashKey(String element) {
      5. try {
      6. MessageDigest md = MessageDigest.getInstance("MD5");
      7. md.update(element.getBytes());
      8. String hash = new BigInteger(1 md.digest()).toString(16);
      9. while(hash.length() < 32) hash = "0" + hash;
      10. return hash;
      11. } catch (NoSuchAlgorithmException e) {
      12. }
      13. return "0000000000000000000000000000000000000000000000000000000000000000";
      14. }
      15. });

      注意:LogPartitioner为可选项,如您没有配置,数据会随机写入某一个Shard。

  2. 执行以下示例语句,将模拟产生的字符串写入日志服务。

  1. // 将数据序列化成日志服务的数据格式
  2. class SimpleLogSerializer implements LogSerializationSchema<String> {
  3. public RawLogGroup serialize(String element) {
  4. RawLogGroup rlg = new RawLogGroup();
  5. RawLog rl = new RawLog();
  6. rl.setTime((int)(System.currentTimeMillis() / 1000));
  7. rl.addContent("message" element);
  8. rlg.addLog(rl);
  9. return rlg;
  10. }
  11. }
  12. public class ProducerSample {
  13. public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
  14. public static String sAccessKeyId = "";
  15. public static String sAccessKey = "";
  16. public static String sProject = "ali-cn-hangzhou-sls-admin";
  17. public static String sLogstore = "test-flink-producer";
  18. private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
  19. public static void main(String[] args) throws Exception {
  20. final ParameterTool params = ParameterTool.fromArgs(args);
  21. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. env.getConfig().setGlobalJobParameters(params);
  23. env.setParallelism(3);
  24. DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
  25. Properties configProps = new Properties();
  26. // 设置访问日志服务的域名
  27. configProps.put(ConfigConstants.LOG_ENDPOINT sEndpoint);
  28. // 设置访问日志服务的ak
  29. configProps.put(ConfigConstants.LOG_ACCESSSKEYID sAccessKeyId);
  30. configProps.put(ConfigConstants.LOG_ACCESSKEY sAccessKey);
  31. // 设置日志写入的日志服务project
  32. configProps.put(ConfigConstants.LOG_PROJECT sProject);
  33. // 设置日志写入的日志服务Logstore
  34. configProps.put(ConfigConstants.LOG_LOGSTORE sLogstore);
  35. FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
  36. simpleStringStream.addSink(logProducer);
  37. env.execute("flink log producer");
  38. }
  39. // 模拟产生日志
  40. public static class EventsGenerator implements SourceFunction<String> {
  41. private boolean running = true;
  42. @Override
  43. public void run(SourceContext<String> ctx) throws Exception {
  44. long seq = 0;
  45. while (running) {
  46. Thread.sleep(10);
  47. ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
  48. }
  49. }
  50. @Override
  51. public void cancel() {
  52. running = false;
  53. }
  54. }
  55. }
本文导读目录