本文为您介绍Blink 3.4.3版本的重大功能变更和主要修复的缺陷。

版本重大功能简介

GeminiStateBackend是Blink开发的新一代后台,它使用自研的存储引擎GeminiDB。通过线上部分作业的测试显示,GeminiStateBackend的性能可以达到NiagaraStateBackend性能的1.5倍。GeminiStateBackend的主要特性如下:
  • 采用LSM + Hash索引的存储模型。其中LSM提高写性能,Hash索引存放在内存中以弥补LSM读放大的缺点。简单来说,GeminiDB将文件划分为更小粒度的单元(Page),以Page为粒度进行冲洗和压缩处理,Hash索引根据键快速定位其所在的单元,从而减少I/O次数,提高读性能。
  • 优化Cache策略。如果新插入的数据或压缩后的数据存在热点,GeminiDB会将其缓存在内存中。而传统的LSM实现会首先将这些数据刷到磁盘上,导致新增数据需要至少经过一次读I/O才能进入缓存,因此缓存命中率下降。
  • 优化数据冲洗到磁盘的策略。GeminiDB只有在内存的缓存填满后才会将部分数据冲洗到磁盘,因此,在内存充足并且压缩及时的情况下,不会产生数据文件。而传统LSM的实现中,会将数据冲洗到磁盘以便进行持久化处理。这在Blink的应用场景中是没有必要的,Blink具有原生的Checkpoint机制以保证数据的一致性,因此只需在Checkpoint时进行持久化处理。
  • 支持In-memory Compaction。对于内存中数据及时进行压缩,减少写放大以及压缩的读I/O。
  • 使用Java消除RocksDB/Niagara中的JNI开销。
  • 支持增量Checkpoint。
  • 支持Local Recovery,使作业失效后快速恢复。
  • 支持存储计算分离,使作业重启或者Rescale后的快速恢复,功能还在完善中。
GeminiStateBackend针对DataStream和SQL的作业配置如下:
  • DataStream作业
    • API配置方式
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      GeminiStateBackend stateBackend = new GeminiStateBackend(checkpointDir);
      // Configuration for gemini
      Configuration config = new Configuration();
      config.setString("state.backend.gemini.heap.size", "1024mb");
      // set configuration to backend
      stateBackend.setConfiguration(conf);
      // use gemini as state backend
      env.setStateBackend(new GeminiStateBackend(checkpointDir));
    • 主要配置项
      配置项 类型 单位 默认值 说明
      state.backend.gemini.ttl.ms LONG ms -1(未开启) 可选。数据存活时间。
      state.backend.gemini.heap.size STRING 支持以下字节单位:
      • 1024
      • 1024kb
      • 1024mb
      • 1024gb
      可选。单个GeminiDB能够使用的内存大小。
      说明 推荐进行配置,否则backend会根据JVM和TaskManager计算其默认值。
  • SQL作业
    # 使用Gemini。
    state.backend.type=gemini
    # State的TTL的大小。
    state.backend.gemini.ttl.ms=129600000
    # GeminiDB允许使用的内存大小,单位MB。注意:Operator配置的内存资源要包含这部分,默认为512MB。
    state.backend.gemini.heap.size.mb=512
    # 推荐的TaskManager的JVM参数。
    blink.job.option=-yD env.java.opts.taskmanager='-XX:NewRatio=3 -XX:SurvivorRatio=3 -XX:ParallelGCThreads=8 -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Djdk.nio.maxCachedBufferSize=10240'

主要缺陷修复

  • 修复Calc的codegen代码可能会出现NPE的缺陷。
  • 修复按照proctime读取FirstRow,状态存储不需要存储完整行的缺陷。
  • 修复code split缺陷:JavaCodeSplitter在把local variable抽取成member field时,没有替换foreach control语句中的local variable。在distinct with filter场景中可能出现这种缺陷。