开发运行中的其他问题

本文介绍开发运行中的其他问题。

报错:Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getUpsertKeysInKeyGroupRange(Lorg/apache/calcite/rel/RelNode;[I)Ljava/util/Set;

  • 报错原因

    如果您依赖了社区的internal API,而这个internal API阿里云上的版本做了一些优化,可能会导致包冲突等异常。

  • 解决方案

    Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云只对这些方法的兼容性做出产品保证。

报错:Task did not exit gracefully within 180 + seconds.

  • 报错详情

    Task did not exit gracefully within 180 + seconds.
    2022-04-22T17:32:25.852861506+08:00 stdout F org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
    2022-04-22T17:32:25.852865065+08:00 stdout F at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1709) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
    2022-04-22T17:32:25.852867996+08:00 stdout F at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
    log_level:ERROR
  • 报错原因

    该报错不是作业异常的根因。因为Task退出的超时task.cancellation.timeout参数的默认值为180s,当作业Failover或退出过程中,可能会因某种原因阻塞Task的退出。当阻塞时间达到超时时间后,Flink会判定该Task已卡死无法恢复,会主动停止该Task所在的TaskManager,让Failover或退出流程继续下去,所以在日志中会出现这样的报错。

    真正的原因可能是您自定义函数的实现有问题,例如close方法的实现中长时间阻塞或者计算方法长时间未返回等。

  • 解决方案

    设置Task退出的超时时间参数task.cancellation.timeout取值为0,配置方法请参见如何配置作业运行参数?配置为0时,Task退出阻塞将不会超时,该task会持续等待退出完成。重启作业后再次发现作业在Failover或退出过程中长时间阻塞时,需要找到处于Cancelling状态的Task,查看该Task的栈,排查问题的根因,然后根据排查到的根因再针对性解决问题。

    重要

    task.cancellation.timeout参数用于作业调试,请不要在生产作业上配置该参数值为0。

报错: Can not retract a non-existent record. This should never happen.

  • 报错详情

    java.lang.RuntimeException: Can not retract a non-existent record. This should never happen.
        at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.processElement(RetractableTopNFunction.java:196)
        at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.processElement(RetractableTopNFunction.java:55)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:799)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586)
        at java.lang.Thread.run(Thread.java:877)
                        
  • 报错原因及解决方案

    场景

    原因

    解决方案

    场景1

    由代码中now()导致。

    因为TopN不支持非确定性的字段作为排序字段(ORDER BY)或分组字段(PARTITION BY),now()每次输出的值不同,所以导致Retraction无法找到之前的值。

    使用源表中定义的只会产生确定性值的字段作为排序字段(ORDER BY)和分组字段(PARTITION BY)。

    场景2

    table.exec.state.ttl参数值设置过小,State因过期被清理,retract时找不到对应keystate。

    调大table.exec.state.ttl参数值。配置方法请参见如何配置作业运行参数?

    场景3

    VVR 4.0.15以下版本的已知缺陷。

    上游使用CDC Connector,Hologres全增量一体源表并不是一致性的读取,全量数据和增量数据之前是有overlap的,导致在处理binlog一开始的update_before时,会发现该数据在State中不存在。

    如果您使用的是CDCHologres,建议您升级作业版本至VVR 4.0.16VVR 6.0.3及以上版本。