Flink CDC表结构变更同步配置

本文向您介绍了在Flink CDC数据摄入作业中与表结构变更(Schema Evolution)相关的配置。

表结构变更同步配置

Flink CDC数据摄入作业支持将数据源的Schema变更同步到下游目标端,例如创建表、添加列、重命名列、更改列类型、删除列和删除表等。

下游目标端可能不支持全部的Schema变更,您可以通过在Pipeline模块添加schema.change.behavior配置来修改Schema变更发生时目标端的处理方式。

pipeline:
  schema.change.behavior: EVOLVE

Schema变更模式

模式

说明

LENIENT(默认)

Flink CDC数据摄入作业会对Schema变更进行转换成目标端可处理的变更并发送,遵循以下规则:

  • 不发送Drop tableTruncate table变更。

  • 列重命名时,改为发送更改列类型和新增列两个事件。原有的列不删除,更改列类型为nullable,同时新增一个列名为新名称、数据类型改为nullable的列。

  • 删除列时,改为发送更改列类型事件,将对应字段类型变为nullable。

  • 新增列时仍发送新增列事件,但字段类型会变为nullable。

EXCEPTION

不允许任何Schema变更行为。

当目标端不支持处理Schema变更时,可以使用此模式。收到Schema变更事件时,作业会抛出异常。

EVOLVE

Flink CDC数据摄入作业会将所有Schema更改应用于目标端。

如果Schema变更在目标端应用失败,作业会抛出异常并触发故障重启。

重要

在此模式下,若下游无法支持应用所有的表结构变更事件,可能导致作业Failover且无法自愈。

TRY_EVOLVE

Flink CDC数据摄入作业会尝试将Schema变更应用到目标端,如果目标端不支持处理发送的Schema变更,作业不会失败重启,尝试通过转换后续数据方式进行处理。

重要

TRY_EVOLVE模式下,如果发生Schema变更应用失败,可能导致上游后续到来的数据出现部分列丢失、被截断以适配下游表结构。

IGNORE

所有Schema变更都不会应用于目标端。

当您的目标端尚未准备好进行任何Schema变更,想要继续从未更改的列中接收数据时,可以使用此模式。

目标端Schema变更控制

在数据同步场景中,Flink CDC 提供精细化 Schema 变更管理策略,允许您通过规则配置控制目标端接收的变更类型,避免意外变更导致的数据丢失或服务中断。

您可以通过在sink模块中设置include.schema.changesexclude.schema.changes选项来控制。

参数

说明

是否必填

数据类型

默认值

备注

include.schema.changes

支持应用的Schema变更。

List<String>

默认支持所有变更。

exclude.schema.changes

不支持应用的Schema变更。

List<String>

优先级高于include.schema.changes

以下是可配置架构变更事件类型的完整列表:

事件类型

说明

add.column

新增列。

alter.column.type

变更列类型。

create.table

创建表。

drop.column

删除列。

drop.table

删除表。

rename.column

修改列名。

truncate.table

清空数据。

说明

Schema变更支持部分匹配。例如,传入drop相当于同时传入drop.columndrop.table;传入 table相当于同时传入create.tabletruncate.tabledrop.table

代码示例

  • 示例1:Schema变更行为配置为EVOLVE,将上游的表结构变更同步到下游。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE
  • 示例2:将上游表的创建表和列相关事件应用到下游,并忽略删除列事件。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
  exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE