本文为您介绍实时计算Flink版系统检查点或作业快照相关的常见问题。
两次Checkpoint最小间隔时间计算方式
最小间隔时间是按照上次成功的Checkpoint开始计算。配置间隔时间为3,最小间隔时间为5,这种情况下,间隔时间会调整为5。
以两个场景进行说明,两个场景Checkpoint间隔时间为3分钟,超时时间为10分钟,最小间隔时间为5分钟。
场景一:作业正常运行(Checkpoint每次都成功)
12:00第一次开始执行Checkpoint,12:00:02 Checkpoint成功,第二次Checkpoint开始时间为12:05:02。
场景二:作业不正常(Checkpoint因某些原因超时或者失败,本次场景以超时为例)
12:00第一次开始执行Checkpoint,12:00:02 Checkpoint成功,12:05:02第二次开始执行Checkpoint,12:15:02超时失败,第三次Checkpoint开始时间为12:15:02。
Checkpoint最小间隔时间设置详情请参见Tuning Checkpoint。
VVR 8.x和VVR 6.x使用的GeminiStateBackend有什么区别?
实时计算Flink版计算引擎VVR 6.x默认使用V3版本的GeminiStateBackend,VVR 8.x默认使用V4版本的GeminiStateBackend。
分类 | 详情 |
基础能力 |
|
状态懒加载参数 |
|
Managed Memory使用差异 | 仅在RSS(Resident Set Size)指标上有区别:
说明 关于Managed Memory的更多解释请参见TaskManager Memory。 |
关于GeminiStateBackend技术解读详情,请参见阿里云实时计算企业级状态存储引擎 Gemini 技术解读。
从VVR 6.x升级到VVR 8.x时,遇到
You are using the new V4 state engine to restore old state data from a checkpoint
报错,解决方案请参见报错:You are using the new V4 state engine to restore old state data from a checkpoint。
报错:org.apache.flink.util.SerializedThrowable
报错详情
报错原因
使用旧版Gemini在进行快照时,有极小概率会遇到NullPointerException(NPE)报错。该报错通常是由于内部内存结构引用为0,但尚未及时回收导致的。
解决方案
通常,该问题在系统运行一段时间后或进行重启后,可以恢复正常。这个问题不会影响数据的正确性,只会导致Checkpoint失败。您可以适当增加Checkpoint失败时的重启容忍次数。
将VVR版本升级到8.0.1及以上版本,详情请参见作业引擎版本升级。
报错:You are using the new V4 state engine to restore old state data from a checkpoint
报错详情
从VVR 6.x升级到VVR 8.x时,报
You are using the new V4 state engine to restore old state data from a checkpoint
。报错原因
新版Gemini和旧版Gemini的Checkpoint无法兼容。
解决方案
您可以采用以下任何一种方式解决:
无状态重启作业。
(不推荐)继续使用旧版本Gemini。需要配置
state.backend.gemini.engine.type: STREAMING
后重启作业才能生效。参数配置方法请参见如何配置作业运行参数?
报错:No space left on device
报错详情
在作业运行过程中,出现类似以下异常。
java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_102] at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_102] at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_102] at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_102] at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_102] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:82) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?] at org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?] at com.alibaba.flink.statebackend.FlinkDataOutputStreamWapper.close(FlinkDataOutputStreamWapper.java:31) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.close(GeminiFileOutputViewImpl.java:188) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.filecache.InfiniteFileCache.lambda$flushBatchPages$1(InfiniteFileCache.java:635) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:137) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:86) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.run(GeminiEventExecutor.java:71) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
报错原因
本地磁盘空间不足。目前单Pod磁盘大小限制为20 GB,在该限制下,造成本地磁盘空间不足的原因通常有以下几点:
状态数据堆积。
计算节点的非状态数据(例如日志)堆积。
异常导致的旧状态数据堆积。
解决方案
您可以通过快照里的State Size判断状态数据是不是过大。如果确定因为状态数据过大造成该报错,则可以采用以下解决方案:
VVR 4.x和VVR 6.x版本优化方案
您可以采用以下任何一种方案:
启用存算分离功能(在VVR 4.0.12及以上版本已默认启用存算分离功能)
即配置state.backend.gemini.file.cache.type和state.backend.gemini.file.cache.preserved-space参数。详情请参见存算分离配置。
增加并发度。
如果原来并发度是1,只能用一个pod,磁盘总空间是20 GB。如果并发加到4,作业就可以使用4个Pod,磁盘总空间相当于是80 GB。
基于State TTL(Time To Live)进行磁盘清理。
当您设置了State TTL,如果时间超过了State的过期时间,则State数据就过期了,系统就会自动清理掉过期的State数据,即可释放一定的磁盘空间。
VVR 3.x.x版本优化方案
您可以采用以下任何一种方案:
对State进行压缩。
VVR 3.0.x版本配置state.backend.gemini.page.flush.local.compression: Lz4参数,本地State会有压缩,能降低本地磁盘空间,但一定程度上会降低作业性能。
启用存算分离功能。
VVR 3.0.3及以上版本配置state.backend.gemini.file.cache.type: LIMITED。本地盘会有一个18 GB的State限制,超过18 GB的数据会被存储到远程DFS,下次读取该部分数据时,会从DFS读取,相当于把本地盘当作一个本地文件缓存。
报错:java.lang.IllegalArgumentException: Illegal Capacity: -1
报错详情
在作业使用Map State的遍历操作时,在作业运行过程中,有小概率会触发到以下异常。
java.lang.IllegalArgumentException: Illegal Capacity: -1 at java.util.ArrayList.<init>(ArrayList.java:156) at com.alibaba.gemini.engine.pagestore.PageStoreImpl$3.<init>(PageStoreImpl.java:1113) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:1094) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKMapTable.internalEntries(BinaryKMapTable.java:83) at com.alibaba.gemini.engine.table.AbstractBinaryKMapTable.iterator(AbstractBinaryKMapTable.java:282) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.doIterator(AbstractGeminiKeyedMapStateImpl.java:496) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iteratorWithMetrics(AbstractGeminiKeyedMapStateImpl.java:501) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iterator(AbstractGeminiKeyedMapStateImpl.java:489) at com.alibaba.flink.statebackend.gemini.context.ContextMapState.entries(ContextMapState.java:97) at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:107) at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:102) at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.<init>(OuterJoinRecordStateViews.java:279) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey.getRecordsAndNumOfAssociations(OuterJoinRecordStateViews.java:276) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:229) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:216) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:134) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:136) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:834)
报错原因
产品已知缺陷,仅在VVR 4.0.10版本出现。
解决方案
将VVR版本升级到4.0.11及以上。
报错:java.lang.NegativeArraySizeException
报错详情
当作业使用了List State,在作业运行过程中,会出现以下异常。
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
报错原因
List State中单个key对应的State数据过大,即超过了2 GB。State数据过大产生的过程如下:
在作业正常运行时,List State中单个Key下Append的Value会通过Merge进行组合(例如在包含Window的List State中),State数据不断累积。
在State数据累积到一定程度时,一开始会触发OOM。而在作业从故障中恢复之后,List State的Merge过程会进一步导致StateBackend申请的临时Byte数组的大小超过可用的限制,从而出现该异常。
说明RocksDBStateBackend也会遇到类似的问题并触发ArrayIndexOutOfBoundsException或者Segmentation fault。详情请参见The EmbeddedRocksDBStateBackend。
解决方案
如果是Window算子导致的State数据过大,则建议减小窗口大小。
如果是作业逻辑不合理,则建议调整作业逻辑,例如将Key进行拆分。
全量Checkpoint与增量Checkpoint的大小一致,是否正常?
如果您在使用Flink的情况下,观察到全量Checkpoint与增量Checkpoint的大小一致,您需要:
检查增量快照是否正常配置并生效。
是否为特定情况。在特定情况下,这种现象是正常的,例如:
在数据注入前(18:29之前),作业没有处理任何数据,此时Checkpoint只包含了初始化的源(Source)状态信息。由于没有其他状态数据,此时的Checkpoint实际上是一个全量Checkpoint。
在18:29时注入了100万条数据。假设数据在接下来的Checkpoint间隔时间(3分钟)内被完全处理,并且期间没有其他数据注入,此时发生的第一个增量Checkpoint将会包含这100万条数据产生的所有状态信息。
在这种情况下,全量Checkpoint和增量Checkpoint的大小一致是符合预期的。因为第一个增量Checkpoint需要包含全量数据状态,以确保能够从该点恢复整个状态,导致它实际上也是一个全量Checkpoint。
增量Checkpoint通常是从第二个Checkpoint开始体现出来的,在数据稳定输入且没有大规模的状态变更时,后续的增量Checkpoint应该显示出大小上的差异,表明系统正常地只对状态的增量部分进行快照。如果仍然一致,则需要进一步审查系统状态和行为,确认是否存在问题。