本文将向您介绍如何在Flink CDC数据摄入作业中使用脏数据收集器。
功能概述
在实时数据同步场景中,源端数据可能因格式错误、编码异常或 schema 不兼容等问题导致解析失败。这类无法正常处理的数据称为 脏数据(Dirty Data)。
数据摄入自VVR 11.5版本开始支持脏数据收集,数据摄入的Kafka数据源已经支持该功能。当连接器遇到无法解析的数据时,系统会将原始数据和异常信息写入收集器。结合连接器配置项,您可以将作业配置为忽略错误、记录详情、同时保持正常运行。
当连接器遇到无法解析的数据时,系统将自动捕获原始消息和异常信息,并将其写入指定的收集器。结合配置策略,可以实现:
容忍少量脏数据,避免整条链路中断。
记录完整上下文,便于后续定位与修复。
设置阈值控制,防止异常泛滥。
典型使用场景
使用场景 | 目标说明 |
日志采集管道 (如App 日志等非结构化数据源) | 数据质量参差不齐,允许跳过少量坏数据,保障主流程持续运行 |
核心业务表同步 (如订单、账户变更等关键系统) | 对一致性要求高,发现即告警,及时介入处理 |
数据探查与调研阶段 | 快速跑通全量数据,先了解整体分布,再回头治理脏数据 |
使用限制与注意事项
在使用前,请务必了解当前功能的能力边界和潜在风险:
Connector 支持范围:当前仅 Kafka 数据源已接入该能力,其他 Source 正在逐步覆盖。
支持的收集器类型:当前仅支持
logger类型,即将脏数据写入日志文件。
此功能适合用于调试期和生产初期;若长期出现大量脏数据,建议推动上游系统进行数据治理。
语法结构
启用脏数据收集器
脏数据收集器在Pipeline模块中定义。语法如下所示:
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger参数 | 说明 |
| 收集器名称,建议命名有意义(如 |
| 收集器类型,可选值如下:
|
未定义此配置项时,即使开启容错,脏数据也不会被记录。
在数据源中配置容错策略
配置脏数据收集并无法让解析报错被跳过,建议结合Kafka的容错策略配合使用,详情可见Kafka连接器文档。配置示例如下:
source:
type: kafka
# 跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100参数 | 默认值 | 说明 |
|
| 是否忽略解析错误。 设置为 |
|
| 最大容忍脏数据条数。 在 |
Logger脏数据收集器
Logger脏数据收集器会将脏数据存储到单独的日志文件中。您可以按照下面的方法查看对应的脏数据日志文件:
进入作业运维页面,点击作业日志选项卡;
点击运行日志,选择运行Task Managers二级选项卡并选择对应算子的TM节点;
点击日志列表,并在下方列表中点击名为
yaml-dirty-data.out的日志文件即可查询、保存收集到的脏数据记录。

目前会为脏数据记录以下元数据信息:
处理该条脏数据的时间戳
发出脏数据记录的算子及Subtask Index
原始脏数据内容
造成处理失败的异常信息

脏数据记录格式示例
每条记录包含以下元数据信息:
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---字段 | 说明 |
时间戳 | 脏数据被捕获的时间 |
Operator & Subtask | 出错的具体算子及并行实例编号 |
Raw Data | 原始未解析的消息内容(Base64 或字符串形式) |
Exception | 解析失败的异常类型与堆栈摘要 |
常见问题
脏数据会影响 Checkpoint 吗?
不会。脏数据在进入状态更新前就被拦截,因此不影响 Checkpoint 的成功与否。
和 Flink SQL 的侧输出流有什么区别?
脏数据收集器:用于处理“无法反序列化或解析失败”的数据;
侧输出流(Side Output):用于处理“能解析但不符合业务规则”的数据。