本文为您介绍数据正确性有关的常见问题。
为什么作业没有输出?
场景描述
上线运行作业后,下游结果表中没有数据。
排错流程图
检查作业中是否存在Failover
排错指引
查看Failover报错信息,分析作业运行异常原因。
解决方案
解决Failover问题,使作业正常运行。
检查源表数据是否进入实时计算Flink版
排错指引
这种情况下没有Failover,但数据延时会很大,请查看监控告警页面numRecordsInOfSource,检查各Source输入是否有数据。
解决方案
检查源表,保证上游有数据进入实时计算Flink版。
检查数据是否被某个节点过滤
在其他配置中添加
pipeline.operator-chaining: 'false'
,具体操作请参见如何配置自定义的作业运行参数?。将节点拆分,然后观察每个节点的Bytes Received(输入)和Bytes Sent(输出),确定数据在哪个节点被过滤,如果某个节点输出为0,输入不为0,说明数据被这个节点过滤了。常见的导致数据无输出的算子包括join、window或where。检查下游是否由于默认缓存机制缓存了数据
解决方案:排除作业的业务逻辑异常后,调整下游存储的batchsize的大小。
如果batchsize参数设置得过小,则可能会造成下游数据库I/O压力过大、存在性能瓶颈的风险。例如,如果将batchsize设置为1,说明处理完一条数据,就会请求一次数据库,大数据场景下会导致数据库压力增大。
检查下游RDS,是否存在死锁
您可以使用print结果表,将计算结果打印到日志中,对日志进行分析,判断无输出结果的原因。详情请参见如何在控制台查看print数据结果?
如何定位Flink无法读取源数据的问题?
当Flink无法读取源数据时,建议从以下几个方面进行排查并处理:
检查上游存储和实时计算Flink版之间网络是否连通。
实时计算Flink版仅支持访问相同地域、相同VPC下的存储。如果您有访问跨VPC存储资源或者通过公网访问实时计算Flink版的特殊需求,请查看以下文档:
如果您需要跨VPC访问存储资源,则可以通过5种方式解决,详情请参见如何访问跨VPC的其他服务?。
如果您需要通过公网访问实时计算Flink版,则可以使用阿里云提供的NAT网关实现VPC网络和公网的连通。详情请参见实时计算Flink版如何访问公网?
检查上游存储中是否已配置了白名单。
上游存储中需要配置的产品有Kafka和ES。您可以按照以下步骤配置白名单:
获取实时计算Flink版虚拟交换机的网段。
获取方法请参见设置白名单。
在上游存储中配置实时计算Flink版白名单。
上游存储中配置白名单的方法,请参见对应DDL文档的前提条件中的文档链接,例如Kafka源表前提条件。
检查DDL中定义的字段类型、字段顺序和字段大小写是否和物理表一致。
为了确保一致性,您可以按照物理表的字段类型和顺序,以及使用相同的大小写规范来编写DDL。上游存储支持的字段类型和实时计算Flink版支持的字段类型可能不完全一致,但存在一定的映射关系。您需要按照DDL定义的字段类型映射关系一对一匹配,详情请参见对应DDL文档类型映射文档,例如日志服务SLS源表类型映射。
查看源表Taskmanager.log日志中是否有异常信息。
如果有异常报错,请先按照报错提示处理问题。查看源表Taskmanager.log日志的操作如下:
在
页面,单击目标作业名称。在状态总览页签,单击Source节点。
在SubTasks页签操作列,单击Open TaskManager Log Page。
在logs页签,查看日志信息。
在当前页面查找最后一个Caused by信息,即第一个Failover中的Caused by信息,往往是导致作业异常的根因,根据该根因的提示信息,可以快速定位作业异常的原因。
如何定位Flink无法将数据写入到结果表的问题?
当Flink无法将数据写入到结果表时,建议从以下几个方面进行排查并处理:
确认下游存储和实时计算Flink版之间网络是否连通。
实时计算Flink版仅支持访问相同地域、相同VPC下的存储。如果您有访问跨VPC存储资源或者通过公网访问实时计算Flink版的特殊需求,请查看以下文档:
如果您需要跨VPC访问存储资源,则可以通过5种方式解决,详情请参见如何访问跨VPC的其他服务?。
如果您需要通过公网访问实时计算Flink版,则可以使用阿里云提供的NAT网关实现VPC网络和公网的连通。详情请参见实时计算Flink版如何访问公网?
确认下游存储中是否已配置了白名单。
下游存储中需要配置白名单的产品包括RDS MySQL、Kafka、ES、云原生数据仓库AnalyticDB MySQL版3.0、HBase、Redis和ClickHouse。您可以按照以下步骤配置白名单:
获取实时计算Flink版虚拟交换机的网段。
获取方法请参见设置白名单。
在下游存储中配置实时计算Flink版白名单。
下游存储中配置白名单的方法,请参见对应DDL文档的前提条件中的文档链接,例如RDS MySQL结果表前提条件。
确认DDL中定义的字段类型、字段顺序和字段大小写是否和物理表一致。
为了确保一致性,您可以按照物理表的字段类型和顺序,以及使用相同的大小写规范来编写DDL。下游存储支持的字段类型和实时计算Flink版支持的字段类型可能不完全一致,但存在一定的映射关系。您需要按照DDL定义的字段类型映射关系一对一匹配,详情请参见对应DDL文档类型映射,例如日志服务SLS结果表类型映射。
确认数据是否被中间节点过滤了,例如WHERE、JOIN和窗口等。
具体请查看Vertex拓扑图上每个计算节点数据输入和输出情况。例如WHERE节点输入为5,输出为0,则代表被WHERE节点过滤了,因此下游存储中无数据写入。
确认下游存储中设置的输出条件相关参数的默认值是否合适。
如果您的数据源的数据量较小,但结果表DDL定制中设置的输出条件的默认值较大,会导致一直达不到输出条件,而无法下发数据至下游存储。此时,您需要将输出条件相关参数的默认值改小。常见的下游存储中的输出条件参数情况如下表所示。
输出条件
参数
涉及的下游存储
输出条件
参数
涉及的下游存储
一次批量写入的条数。
batchSize
每次批量写入数据的最大数据条数。
batchCount
Odps tunnel writer缓冲区Flush间隔。
flushIntervalMs
写入HBase前,内存中缓存的数据量(字节)大小。
sink.buffer-flush.max-size
写入HBase前,内存中缓存的数据条数。
sink.buffer-flush.max-rows
将缓存数据周期性写入到HBase的间隔,可以控制写入HBase的延迟。
sink.buffer-flush.interval
Hologres Sink节点数据攒批的最大值。
jdbcWriteBatchSize
确认窗口是否因为乱序而导致数据无法输出。
假如,实时计算Flink版一开始就流入一条2100年的未来数据,它的Watermark为2100年,系统会默认2100年前的数据已被处理完,只会处理比2100年大的数据。而后续流入的2021年的正常数据因为Watermark小于2100年而被丢弃。直到出现大于2100年的数据流入实时计算Flink版,则会触发窗口关闭而输出数据,否则就会导致结果表一直没有数据输出。
您可以通过Print Sink或者Log4j的方式确认数据源中是否存在乱序的数据,详情请参见print结果表和配置作业日志输出。找到乱序数据后,您可以过滤或者采取延迟触发窗口计算的方式处理乱序的数据。
确认是否因为个别并发没有数据而导致数据无法输出。
如果作业为多并发,但个别并发没有数据流入实时计算Flink版,则它的Watermark就为1970年0点0分,而多个并发的Watermark取最小值,因此就永远没有满足窗口结束的Watermark,无法触发窗口结束而输出数据。
此时,您需要检查您上游的Vertex拓扑图的Subtask每个并发是不是都有数据流入。如果有个别并发无数据,建议调整作业并发数小于等于源表Shard数,从而保证所有并发都有数据。
确认Kafka的某个分区是否无数据,从而导致数据无法输出。
如果Kafka某个分区没有数据,则会影响Watermark的产生,从而导致Kafka源表数据基于Event Time的窗口后,不能输出数据。解决方案请参见为什么Kafka源表数据基于Event Time的窗口后,不能输出数据?。
如何定位数据丢失的问题?
数据经过JOIN、WHERE或窗口等节点时,数据量减少是正常现象,这是因条件限制被过滤或JOIN不上。但如果您的数据丢失异常,建议从以下几个方面进行排查并处理:
确认维表Cache缓存策略是否有问题。
如果维表DDL中Cache缓存策略设置的有问题,则会导致维表的数据没有被拉取到,从而导致数据丢失。此时建议检查并修改作业Cache策略。作业Cache策略详情请参见各维表的Cache策略,例如HBase维表Cache参数。
确认函数使用方法是否不正确。
如果您在作业中使用了to_timestamp_tz、date_format等函数,而函数的使用方法不正确,导致数据转换出问题,数据被丢失。
此时,您可以通过Print Sink或者Log4j的方式,单独将使用的函数的信息打印到日志中,确认函数的使用方法是否正确。详情请参见print结果表或配置作业日志输出。
确认数据是否乱序。
如果作业中存在乱序的数据,这些乱序的数据的Watermark不在新窗口的开窗和关窗时间范围内,导致这些数据被丢弃。例如下图中11秒的数据在16秒进入15~20秒的窗口,而它的Watermark为11,会被系统认为是迟到数据,从而导致被丢弃。
通常丢失的数据都是一个窗口的,您可以通过Print Sink或者Log4j的方式确认数据源中是否存在乱序的数据。详情请参见print结果表或配置作业日志输出。
找到乱序数据后,可以根据乱序的程度,合理地设置Watermark,采取延迟触发窗口计算的方式处理乱序的数据。例如该示例中,可以定义Watermark生成策略为Watermark = Event time -5s,从而让乱序的数据可以被正确地处理。建议以整天整时整分开窗口求聚合,否则数据乱序严重,增加offset后还是会有数据丢失问题。
报错:doesn't support consuming update and delete changes which is produced by node TableSourceScan
报错详情
Table sink 'vvp.default.***' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, default, ***]], fields=[id,b, content]) at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapExecutor(DelegateOperationExecutor.java:286) at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.validate(DelegateOperationExecutor.java:211) at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validate(FlinkSqlServiceImpl.java:741) at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:2522) at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834)
报错原因
Append类型Sink无法接收上游update记录。
解决方案
使用支持写入update记录的Sink,例如Upsert Kafka等。详情请参见Upsert Kafka结果表。
- 本页导读 (1)
- 为什么作业没有输出?
- 如何定位Flink无法读取源数据的问题?
- 如何定位Flink无法将数据写入到结果表的问题?
- 如何定位数据丢失的问题?
- 报错:doesn't support consuming update and delete changes which is produced by node TableSourceScan