脏数据收集

更新时间:
复制为 MD 格式

本文将向您介绍如何在Flink CDC数据摄入作业中使用脏数据收集器。

功能概述

在实时数据同步场景中,源端数据可能因格式错误、编码异常或 schema 不兼容等问题导致解析失败。这类无法正常处理的数据称为 脏数据(Dirty Data)。

数据摄入自VVR 11.5版本开始支持脏数据收集,数据摄入的Kafka数据源已经支持该功能。当连接器遇到无法解析的数据时,系统会将原始数据和异常信息写入收集器。结合连接器配置项,您可以将作业配置为忽略错误、记录详情、同时保持正常运行。

当连接器遇到无法解析的数据时,系统将自动捕获原始消息和异常信息,并将其写入指定的收集器。结合配置策略,可以实现:

  • 容忍少量脏数据,避免整条链路中断。

  • 记录完整上下文,便于后续定位与修复。

  • 设置阈值控制,防止异常泛滥。

典型使用场景

使用场景

目标说明

日志采集管道

(如App 日志等非结构化数据源)

数据质量参差不齐,允许跳过少量坏数据,保障主流程持续运行

核心业务表同步

(如订单、账户变更等关键系统)

对一致性要求高,发现即告警,及时介入处理

数据探查与调研阶段

快速跑通全量数据,先了解整体分布,再回头治理脏数据

使用限制与注意事项

在使用前,请务必了解当前功能的能力边界和潜在风险:

  • Connector 支持范围:当前仅 Kafka 数据源已接入该能力,其他 Source 正在逐步覆盖。

  • 支持的收集器类型:当前仅支持 logger 类型,即将脏数据写入日志文件。

说明

此功能适合用于调试期和生产初期;若长期出现大量脏数据,建议推动上游系统进行数据治理。

语法结构

启用脏数据收集器

脏数据收集器在Pipeline模块中定义。语法如下所示:

pipeline:
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

参数

说明

name

收集器名称,建议命名有意义(如 Kafka-DQ-Collector

type

收集器类型,可选值如下:

  • logger:将脏数据写入日志文件。

说明

未定义此配置项时,即使开启容错,脏数据也不会被记录。

在数据源中配置容错策略

配置脏数据收集并无法让解析报错被跳过,建议结合Kafka的容错策略配合使用,详情可见Kafka连接器文档。配置示例如下:

source:
  type: kafka
  # 跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

参数

默认值

说明

ingestion.ignore-errors

false

是否忽略解析错误。

设置为 true 时,跳过数据处理;设置为 false 时立即失败。

ingestion.error-tolerance.max-count

-1(不限)

最大容忍脏数据条数。

ingestion.ignore-errorstrue时,如果收集到的脏数据超过此数值则触发Failover停止作业。

Logger脏数据收集器

Logger脏数据收集器会将脏数据存储到单独的日志文件中。您可以按照下面的方法查看对应的脏数据日志文件:

  1. 进入作业运维页面,点击作业日志选项卡;

  2. 点击运行日志,选择运行Task Managers二级选项卡并选择对应算子的TM节点;

  3. 点击日志列表,并在下方列表中点击名为yaml-dirty-data.out的日志文件即可查询、保存收集到的脏数据记录。

截屏2025-12-23 09

目前会为脏数据记录以下元数据信息:

  • 处理该条脏数据的时间戳

  • 发出脏数据记录的算子及Subtask Index

  • 原始脏数据内容

  • 造成处理失败的异常信息

截屏2025-12-23 09

脏数据记录格式示例

每条记录包含以下元数据信息:

text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---

字段

说明

时间戳

脏数据被捕获的时间

Operator & Subtask

出错的具体算子及并行实例编号

Raw Data

原始未解析的消息内容(Base64 或字符串形式)

Exception

解析失败的异常类型与堆栈摘要

常见问题

脏数据会影响 Checkpoint 吗?

不会。脏数据在进入状态更新前就被拦截,因此不影响 Checkpoint 的成功与否。

和 Flink SQL 的侧输出流有什么区别?

  • 脏数据收集器:用于处理“无法反序列化或解析失败”的数据;

  • 侧输出流(Side Output):用于处理“能解析但不符合业务规则”的数据。