数据正确性

更新时间:2025-01-26 06:17:38

本文为您介绍数据正确性有关的常见问题。

为什么作业没有输出?

  • 场景描述

    上线运行作业后,下游结果表中没有数据。

  • 排错流程图作业排错流程图

    1. 检查作业中是否存在Failover

      • 排错指引

        查看Failover报错信息,分析作业运行异常原因。

      • 解决方案

        解决Failover问题,使作业正常运行。

    2. 检查源表数据是否进入实时计算Flink

      • 排错指引

        这种情况下没有Failover,但数据延时会很大,请查看监控告警页面numRecordsInOfSource,检查各Source输入是否有数据。

      • 解决方案

        检查源表,保证上游有数据进入实时计算Flink版。

    3. 检查数据是否被某个节点过滤

      其他配置中添加pipeline.operator-chaining: 'false' ,具体操作请参见如何配置自定义的作业运行参数?。将节点拆分,然后观察每个节点的Bytes Received(输入)和Bytes Sent(输出),确定数据在哪个节点被过滤,如果某个节点输出为0,输入不为0,说明数据被这个节点过滤了。常见的导致数据无输出的算子包括join、windowwhere。

    4. 检查下游是否由于默认缓存机制缓存了数据

      解决方案:排除作业的业务逻辑异常后,调整下游存储的batchsize的大小。

      重要

      如果batchsize参数设置得过小,则可能会造成下游数据库I/O压力过大、存在性能瓶颈的风险。例如,如果将batchsize设置为1,说明处理完一条数据,就会请求一次数据库,大数据场景下会导致数据库压力增大。

    5. 检查下游RDS,是否存在死锁

      解决方案:请参见MySQL(TDDL/RDS)时,出现死锁(DeadLock)。

说明

您可以使用print结果表,将计算结果打印到日志中,对日志进行分析,判断无输出结果的原因。详情请参见如何在控制台查看print数据结果?

如何定位Flink无法读取源数据的问题?

Flink无法读取源数据时,建议从以下几个方面进行排查并处理:

  • 检查上游存储和实时计算Flink版之间网络是否连通。

    实时计算Flink版仅支持访问相同地域、相同VPC下的存储。如果您有访问跨VPC存储资源或者通过公网访问实时计算Flink版的特殊需求,请查看以下文档:

  • 检查上游存储中是否已配置了白名单。

    上游存储中需要配置的产品有KafkaES。您可以按照以下步骤配置白名单:

    1. 获取实时计算Flink版虚拟交换机的网段。

      获取方法请参见设置白名单

    2. 在上游存储中配置实时计算Flink版白名单。

      上游存储中配置白名单的方法,请参见对应DDL文档的前提条件中的文档链接,例如Kafka源表前提条件

  • 检查DDL中定义的字段类型、字段顺序和字段大小写是否和物理表一致。

    为了确保一致性,您可以按照物理表的字段类型和顺序,以及使用相同的大小写规范来编写DDL。上游存储支持的字段类型和实时计算Flink版支持的字段类型可能不完全一致,但存在一定的映射关系。您需要按照DDL定义的字段类型映射关系一对一匹配,详情请参见对应DDL文档类型映射文档,例如日志服务SLS源表类型映射

  • 查看源表Taskmanager.log日志中是否有异常信息。

    如果有异常报错,请先按照报错提示处理问题。查看源表Taskmanager.log日志的操作如下:

    1. 运维中心 > 作业运维页面,单击目标作业名称。

    2. 状态总览页签,单击Source节点。

    3. SubTasks页签操作列,单击Open TaskManager Log PageTM日志

    4. logs页签,查看日志信息。

      在当前页面查找最后一个Caused by信息,即第一个Failover中的Caused by信息,往往是导致作业异常的根因,根据该根因的提示信息,可以快速定位作业异常的原因。

    如何定位Flink无法将数据写入到结果表的问题?

    Flink无法将数据写入到结果表时,建议从以下几个方面进行排查并处理:

    • 确认下游存储和实时计算Flink版之间网络是否连通。

      实时计算Flink版仅支持访问相同地域、相同VPC下的存储。如果您有访问跨VPC存储资源或者通过公网访问实时计算Flink版的特殊需求,请查看以下文档:

    • 确认下游存储中是否已配置了白名单。

      下游存储中需要配置白名单的产品包括RDS MySQL、Kafka、ES、云原生数据仓库AnalyticDB MySQL3.0、HBase、RedisClickHouse。您可以按照以下步骤配置白名单:

      1. 获取实时计算Flink版虚拟交换机的网段。

        获取方法请参见设置白名单

      2. 在下游存储中配置实时计算Flink版白名单。

        下游存储中配置白名单的方法,请参见对应DDL文档的前提条件中的文档链接,例如RDS MySQL结果表前提条件

    • 确认DDL中定义的字段类型、字段顺序和字段大小写是否和物理表一致。

      为了确保一致性,您可以按照物理表的字段类型和顺序,以及使用相同的大小写规范来编写DDL。下游存储支持的字段类型和实时计算Flink版支持的字段类型可能不完全一致,但存在一定的映射关系。您需要按照DDL定义的字段类型映射关系一对一匹配,详情请参见对应DDL文档类型映射,例如日志服务SLS结果表类型映射

    • 确认数据是否被中间节点过滤了,例如WHERE、JOIN和窗口等。

      具体请查看Vertex拓扑图上每个计算节点数据输入和输出情况。例如WHERE节点输入为5,输出为0,则代表被WHERE节点过滤了,因此下游存储中无数据写入。

    • 确认下游存储中设置的输出条件相关参数的默认值是否合适。

      如果您的数据源的数据量较小,但结果表DDL定制中设置的输出条件的默认值较大,会导致一直达不到输出条件,而无法下发数据至下游存储。此时,您需要将输出条件相关参数的默认值改小。常见的下游存储中的输出条件参数情况如下表所示。

      输出条件

      参数

      涉及的下游存储

      输出条件

      参数

      涉及的下游存储

      一次批量写入的条数。

      batchSize

      每次批量写入数据的最大数据条数。

      batchCount

      数据总线DataHub

      Odps tunnel writer缓冲区Flush间隔。

      flushIntervalMs

      大数据计算服务MaxCompute

      写入HBase前,内存中缓存的数据量(字节)大小。

      sink.buffer-flush.max-size

      云数据库HBase

      写入HBase前,内存中缓存的数据条数。

      sink.buffer-flush.max-rows

      云数据库HBase

      将缓存数据周期性写入到HBase的间隔,可以控制写入HBase的延迟。

      sink.buffer-flush.interval

      云数据库HBase

      Hologres Sink节点数据攒批的最大值。

      jdbcWriteBatchSize

      实时数仓Hologres

    • 确认窗口是否因为乱序而导致数据无法输出。

      假如,实时计算Flink版一开始就流入一条2100年的未来数据,它的Watermark2100年,系统会默认2100年前的数据已被处理完,只会处理比2100年大的数据。而后续流入的2021年的正常数据因为Watermark小于2100年而被丢弃。直到出现大于2100年的数据流入实时计算Flink版,则会触发窗口关闭而输出数据,否则就会导致结果表一直没有数据输出。

      您可以通过Print Sink或者Log4j的方式确认数据源中是否存在乱序的数据,详情请参见print结果表配置作业日志输出。找到乱序数据后,您可以过滤或者采取延迟触发窗口计算的方式处理乱序的数据。

    • 确认是否因为个别并发没有数据而导致数据无法输出。

      如果作业为多并发,但个别并发没有数据流入实时计算Flink版,则它的Watermark就为197000分,而多个并发的Watermark取最小值,因此就永远没有满足窗口结束的Watermark,无法触发窗口结束而输出数据。

      此时,您需要检查您上游的Vertex拓扑图的Subtask每个并发是不是都有数据流入。如果有个别并发无数据,建议调整作业并发数小于等于源表Shard数,从而保证所有并发都有数据。

    • 确认Kafka的某个分区是否无数据,从而导致数据无法输出。

      如果Kafka某个分区没有数据,则会影响Watermark的产生,从而导致Kafka源表数据基于Event Time的窗口后,不能输出数据。解决方案请参见为什么Kafka源表数据基于Event Time的窗口后,不能输出数据?

如何定位数据丢失的问题?

数据经过JOIN、WHERE或窗口等节点时,数据量减少是正常现象,这是因条件限制被过滤或JOIN不上。但如果您的数据丢失异常,建议从以下几个方面进行排查并处理:

  • 确认维表Cache缓存策略是否有问题。

    如果维表DDLCache缓存策略设置的有问题,则会导致维表的数据没有被拉取到,从而导致数据丢失。此时建议检查并修改作业Cache策略。作业Cache策略详情请参见各维表的Cache策略,例如HBase维表Cache参数

  • 确认函数使用方法是否不正确。

    如果您在作业中使用了to_timestamp_tz、date_format等函数,而函数的使用方法不正确,导致数据转换出问题,数据被丢失。

    此时,您可以通过Print Sink或者Log4j的方式,单独将使用的函数的信息打印到日志中,确认函数的使用方法是否正确。详情请参见print结果表配置作业日志输出

  • 确认数据是否乱序。

    如果作业中存在乱序的数据,这些乱序的数据的Watermark不在新窗口的开窗和关窗时间范围内,导致这些数据被丢弃。例如下图中11秒的数据在16秒进入15~20秒的窗口,而它的Watermark11,会被系统认为是迟到数据,从而导致被丢弃。乱序

    通常丢失的数据都是一个窗口的,您可以通过Print Sink或者Log4j的方式确认数据源中是否存在乱序的数据。详情请参见print结果表配置作业日志输出

    找到乱序数据后,可以根据乱序的程度,合理地设置Watermark,采取延迟触发窗口计算的方式处理乱序的数据。例如该示例中,可以定义Watermark生成策略为Watermark = Event time -5s,从而让乱序的数据可以被正确地处理。建议以整天整时整分开窗口求聚合,否则数据乱序严重,增加offset后还是会有数据丢失问题。

报错:doesn't support consuming update and delete changes which is produced by node TableSourceScan

  • 报错详情

    Table sink 'vvp.default.***' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, default, ***]], fields=[id,b, content])
        at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapExecutor(DelegateOperationExecutor.java:286)
        at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.validate(DelegateOperationExecutor.java:211)
        at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validate(FlinkSqlServiceImpl.java:741)
        at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:2522)
        at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • 报错原因

    Append类型Sink无法接收上游update记录。

  • 解决方案

    使用支持写入update记录的Sink,例如Upsert Kafka等。详情请参见Upsert Kafka结果表

  • 本页导读 (1)
  • 为什么作业没有输出?
  • 如何定位Flink无法读取源数据的问题?
  • 如何定位Flink无法将数据写入到结果表的问题?
  • 如何定位数据丢失的问题?
  • 报错:doesn't support consuming update and delete changes which is produced by node TableSourceScan
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等