本文向您介绍了在Flink CDC数据摄入作业中与表结构变更(Schema Evolution)相关的配置。
表结构变更同步配置
Flink CDC数据摄入作业支持将数据源的Schema变更同步到下游目标端,例如创建表、添加列、重命名列、更改列类型、删除列和删除表等。
下游目标端可能不支持全部的Schema变更,您可以通过在Pipeline模块添加schema.change.behavior
配置来修改Schema变更发生时目标端的处理方式。
pipeline:
schema.change.behavior: EVOLVE
Schema变更模式
模式 | 说明 |
LENIENT(默认) | Flink CDC数据摄入作业会对Schema变更进行转换成目标端可处理的变更并发送,遵循以下规则:
|
EXCEPTION | 不允许任何Schema变更行为。 当目标端不支持处理Schema变更时,可以使用此模式。收到Schema变更事件时,作业会抛出异常。 |
EVOLVE | Flink CDC数据摄入作业会将所有Schema更改应用于目标端。 如果Schema变更在目标端应用失败,作业会抛出异常并触发故障重启。 重要 在此模式下,若下游无法支持应用所有的表结构变更事件,可能导致作业Failover且无法自愈。 |
TRY_EVOLVE | Flink CDC数据摄入作业会尝试将Schema变更应用到目标端,如果目标端不支持处理发送的Schema变更,作业不会失败重启,尝试通过转换后续数据方式进行处理。 重要 TRY_EVOLVE模式下,如果发生Schema变更应用失败,可能导致上游后续到来的数据出现部分列丢失、被截断以适配下游表结构。 |
IGNORE | 所有Schema变更都不会应用于目标端。 当您的目标端尚未准备好进行任何Schema变更,想要继续从未更改的列中接收数据时,可以使用此模式。 |
目标端Schema变更控制
在数据同步场景中,Flink CDC 提供精细化 Schema 变更管理策略,允许您通过规则配置控制目标端接收的变更类型,避免意外变更导致的数据丢失或服务中断。
您可以通过在sink
模块中设置include.schema.changes
和exclude.schema.changes
选项来控制。
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
include.schema.changes | 支持应用的Schema变更。 | 否 | List<String> | 无 | 默认支持所有变更。 |
exclude.schema.changes | 不支持应用的Schema变更。 | 否 | List<String> | 无 | 优先级高于 |
以下是可配置架构变更事件类型的完整列表:
事件类型 | 说明 |
| 新增列。 |
| 变更列类型。 |
| 创建表。 |
| 删除列。 |
| 删除表。 |
| 修改列名。 |
| 清空数据。 |
Schema变更支持部分匹配。例如,传入drop
相当于同时传入drop.column
和 drop.table
;传入 table
相当于同时传入create.table
、truncate.table
和drop.table
。
代码示例
示例1:Schema变更行为配置为EVOLVE,将上游的表结构变更同步到下游。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.pipeline: EVOLVE
示例2:将上游表的创建表和列相关事件应用到下游,并忽略删除列事件。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
pipeline:
name: mysql to print job
schema.change.pipeline: EVOLVE