本文向您介绍了在Flink CDC数据摄入作业中与表结构变更(Schema Evolution)相关的配置。
表结构变更同步配置
Flink CDC数据摄入作业支持将数据源的Schema变更同步到下游目标端,例如创建表、添加列、重命名列、更改列类型、删除列和删除表等。
下游目标端可能不支持全部的Schema变更,您可以通过在Pipeline模块添加schema.change.behavior配置来修改Schema变更发生时目标端的处理方式。
pipeline:
schema.change.behavior: EVOLVE目前,框架只支持同步以下表结构变更类型。未列出的表结构变更可能导致作业异常,需要无状态重启恢复。
Create Table(建表事件,
CREATE TABLE ...)Add Column(加列事件,
ALTER TABLE ... ADD COLUMN ...)Alter Column Type(修改列类型事件,
ALTER TABLE ... MODIFY COLUMN ...)Drop Column(删除列事件,
ALTER TABLE ... DROP COLUMN ...)Rename Column(重命名列事件,
ALTER TABLE ... RENAME COLUMN ...)Truncate Table(清空表事件,
TRUNCATE TABLE ...)Drop Table(删除表事件,
DROP TABLE ...)
Schema变更模式
模式 | 说明 |
LENIENT(默认) | Flink CDC数据摄入作业会对Schema变更进行转换成目标端可处理的变更并发送,遵循以下规则:
|
EXCEPTION | 不允许任何Schema变更行为。 当目标端不支持处理Schema变更时,可以使用此模式。收到Schema变更事件时,作业会抛出异常。 |
EVOLVE | Flink CDC数据摄入作业会将所有Schema更改应用于目标端。 如果Schema变更在目标端应用失败,作业会抛出异常并触发故障重启。 重要 在此模式下,若下游无法支持应用所有的表结构变更事件,可能导致作业Failover且无法自愈。 |
TRY_EVOLVE | Flink CDC数据摄入作业会尝试将Schema变更应用到目标端,如果目标端不支持处理发送的Schema变更,作业不会失败重启,尝试通过转换后续数据方式进行处理。 重要 TRY_EVOLVE模式下,如果发生Schema变更应用失败,可能导致上游后续到来的数据出现部分列丢失、被截断以适配下游表结构。 |
IGNORE | 所有Schema变更都不会应用于目标端。 当您的目标端尚未准备好进行任何Schema变更,想要继续从未更改的列中接收数据时,可以使用此模式。 |
目标端Schema变更控制
在数据同步场景中,Flink CDC 提供精细化 Schema 变更管理策略,允许您通过规则配置控制目标端接收的变更类型,避免意外变更导致的数据丢失或服务中断。
您可以通过在sink模块中设置include.schema.changes和exclude.schema.changes选项来控制。
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
include.schema.changes | 支持应用的Schema变更。 | 否 | List<String> | 无 | 默认支持所有变更。 |
exclude.schema.changes | 不支持应用的Schema变更。 | 否 | List<String> | 无 | 优先级高于 |
以下是可配置架构变更事件类型的完整列表:
事件类型 | 说明 |
| 新增列。 |
| 变更列类型。 |
| 创建表。 |
| 删除列。 |
| 删除表。 |
| 修改列名。 |
| 清空数据。 |
Schema变更支持部分匹配。例如,传入drop相当于同时传入drop.column和 drop.table;传入 table相当于同时传入create.table、truncate.table和drop.table。
代码示例
示例1:Schema变更行为配置为EVOLVE,将上游的表结构变更同步到下游。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE示例2:将上游表的创建表和列相关事件应用到下游,并忽略删除列事件。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE