本文为您介绍如何使用MySQL连接器。
背景信息
MySQL连接器支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL、OceanBase(MySQL模式)或者自建MySQL。
使用MySQL连接器读取OceanBase时,请确保OceanBase Binlog已开启且被正确设置,详情请参见Binlog 相关操作。该能力目前处于公测阶段,请在使用前充分评估并谨慎使用。
MySQL连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 源表、维表和结果表,数据摄入数据源 | 
| 运行模式 | 仅支持流模式 | 
| 数据格式 | 暂不适用 | 
| 特有监控指标 | |
| API种类 | Datastream,SQL和数据摄入YAML | 
| 是否支持更新或删除结果表数据 | 是 | 
特色功能
MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证不多读一条也不少读一条数据。即使发生故障,也能保证通过Exactly Once语义处理数据。MySQL CDC源表支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传,详情可参见关于MySQL CDC源表。
- 流批一体,支持读取全量和增量数据,无需维护两套流程。 
- 支持并发读取全量数据,性能水平扩展。 
- 全量读取无缝切换增量读取,自动缩容,节省计算资源。 
- 全量阶段读取支持断点续传,更加稳定。 
- 无锁读取全量数据,不影响在线业务。 
- 支持读取RDS MySQL的备份日志。 
- 并行解析Binlog文件,读取延迟更低。 
前提条件
在使用MySQL CDC源表前,必须先按照配置MySQL进行操作,这些操作主要为了满足使用MySQL CDC源表的前提条件
RDS MySQL
- 与实时计算Flink版进行网络探测,确保网络连通。 
- MySQL版本要求:5.6,5.7,8.0.x。 
- 需开启Binlog(默认开启)。 
- Binlog格式需要为ROW(默认)。 
- 设置binlog_row_image为FULL(默认)。 
- 关闭Binary Log Transaction Compression。(8.0.20及以上引入,默认关闭)。 
- 已创建MySQL用户,并授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT权限。 
- 已创建MySQL数据库和表,详情请参见RDS MySQL创建数据库和账号。(请使用高权限账号来创建MySQL数据库,避免因权限不足而导致操作失败。) 
- 已设置IP白名单,详情请参见RDS MySQL白名单设置。 
PolarDB MySQL
- 与实时计算Flink版进行网络探测,确保网络连通。 
- MySQL版本要求:5.6,5.7,8.0.x。 
- 需开启Binlog(默认关闭)。 
- Binlog格式需要为ROW(默认)。 
- 设置binlog_row_image为FULL(默认)。 
- 关闭Binary Log Transaction Compression。(8.0.20及以上引入,默认关闭)。 
- 已创建MySQL用户,并授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT权限。 
- 已创建MySQL数据库和表,详情请参见PolarDB MySQL创建数据库和账号。(请使用高权限账号来创建MySQL数据库,避免因权限不足而导致操作失败。) 
- 已设置IP白名单,详情请参见PolarDB MySQL白名单设置。 
自建MySQL
- 与实时计算Flink版进行网络探测,确保网络连通。 
- MySQL版本要求:5.6,5.7,8.0.x。 
- 需开启Binlog(默认关闭)。 
- Binlog格式需要为ROW(默认为STATEMENT)。 
- 设置binlog_row_image为FULL(默认)。 
- 关闭Binary Log Transaction Compression。(8.0.20及以上引入,默认关闭)。 
- 已创建MySQL用户,并授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT权限。 
- 已创建MySQL数据库和表,详情请参见自建MySQL创建数据库和账号。。(请使用高权限账号来创建MySQL数据库,避免因权限不足而导致操作失败。) 
- 已设置IP白名单,详情请参见自建MySQL白名单设置。 
使用限制
通用限制
- MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见不支持定义Watermark,那如何进行窗口聚合?。 
- 在CTAS和CDAS作业中,MySQL CDC源表可以同步部分Schema变更,支持的变更类型详情请参见表结构变更同步策略。 
- MySQL CDC连接器目前暂不支持Binary Log Transaction Compression(二进制日志事务压缩) 功能。因此,在使用MySQL CDC连接器消费增量数据时,请务必确保已关闭Binary Log Transaction Compression配置,否则可能导致增量数据无法正常获取。 
RDS MySQL的限制
- 对于RDS MySQL,不建议通过备库或只读从库读取数据。因为RDS MySQL的备库和只读从库Binlog保留时间默认很短,可能由于Binlog过期清理,导致作业无法消费Binlog数据而报错。 
- RDS MySQL默认开启了主从并行同步功能,且不保证主从事务顺序一致,可能导致主从切换后并Checkpoint恢复时漏读部分数据。您可以手动打开RDS MySQL的slave_preserve_commit_order选项来规避此问题。 
PolarDB MySQL的限制
MySQL CDC源表不支持读取PolarDB MySQL版1.0.19及以前版本的多主架构集群(什么是多主集群?)。PolarDB MySQL版1.0.19及更早版本的多主架构集群产生的Binlog可能出现重复Table ID,导致CDC源表Schema映射错误,从而解析Binlog数据报错。
开源MySQL的限制
在默认配置下,MySQL进行主从Binlog复制时,总是保持Transaction顺序。若MySQL副本启用了并行复制(slave_parallel_workers> 1)但未开启 slave_preserve_commit_order=ON,其事务提交顺序可能与主库不一致。Flink CDC 从检查点恢复时会因顺序错乱而漏读数据。推荐在MySQL副本上设置 slave_preserve_commit_order = ON。或设置 slave_parallel_workers = 1(会牺牲复制性能)。
注意事项
- 结果表 - 自增主键,在DDL中不声明。写入数据时,MySQL会自动填写。 
- 必须至少声明一个非主键字段,否则报错。 
- DDL中NOT ENFORCED表示Flink自身对主键不做强制校验,需要您自行保证主键的正确性和完整性。详情请参见Validity Check。 
 
- 维表 - 如果希望使用索引加速查询,JOIN时字段顺序,要和索引定义的顺序一致(最左前缀原则)。比如索引是 (a, b, c),JOIN条件则为 - ON t.a = x AND t.b = y。- Flink 生成的 SQL 可能被优化器改写,导致实际查库时无法命中索引。确认是否使用了索引,去 MySQL 里看执行计划(EXPLAIN)或慢查询日志,查看真实执行的Select语句。 
SQL
MySQL连接器可以在SQL作业中使用,作为源表,维表或者结果表。
语法结构
CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);- 连接器写入结果表原理:写入结果表时,会将接收到的每条数据拼接成一条SQL去执行。具体执行的SQL情况如下: - 对于没有主键的结果表,会拼接执行 - INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);语句。
- 对于包含主键的结果表,会拼接执行 - INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;语句。请注意:如果物理表存在除主键外的唯一索引约束,当插入两条主键不同但唯一索引相同的记录时,下游数据会因为唯一索引冲突导致数据覆盖引发数据丢失。
 
- 如果在MySQL数据库定义了自增主键,在Flink DDL中不应该声明该自增字段。数据写入过程中,数据库会自动填补该自增字段。连接器仅支持写入和删除带自增字段的数据,不支持更新。 
WITH参数
- 通用 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - connector - 表类型。 - 是 - STRING - 无 - 作为源表时,可以填写为 - mysql-cdc或者- mysql,二者等价。作为维表或结果表时,固定值为- mysql。- hostname - MySQL数据库的IP地址或者Hostname。 - 是 - STRING - 无 - 建议填写专有网络VPC地址。 说明- 如果MySQL与实时计算Flink版不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见空间管理与操作和Flink全托管集群如何访问公网?。 - username - MySQL数据库服务的用户名。 - 是 - STRING - 无 - 无。 - password - MySQL数据库服务的密码。 - 是 - STRING - 无 - 无。 - database-name - MySQL数据库名称。 - 是 - STRING - 无 - 作为源表时,数据库名称支持正则表达式以读取多个数据库的数据。 
- 使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见table-name备注的说明。 
 - table-name - MySQL表名。 - 是 - STRING - 无 - 作为源表时,表名支持正则表达式以读取多个表的数据。 - 在读取多个MySQL表时,将多个CTAS语句作为一个作业提交,可以避免启用多个Binlog监听,提高性能和效率。详情请参见多CTAS语句:作为一个作业提交。 
- 使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见以下说明。 
 说明- MySQL CDC源表在正则匹配表名时,会将您填写的 database-name,table-name 通过字符串 \\.(VVR 8.0.1前使用字符.)连接成为一个全路径的正则表达式,然后使用该正则表达式和MySQL数据库中表的全限定名进行正则匹配。 - 例如:当配置'database-name'='db_.*'且'table-name'='tb_.+'时,连接器将会使用正则表达式db_.*\\.tb_.+(8.0.1版本前为db_.*.tb_.+)去匹配表的全限定名来确定需要读取的表。 - port - MySQL数据库服务的端口号。 - 否 - INTEGER - 3306 - 无。 
- 源表独有 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - server-id - 数据库客户端的一个数字ID。 - 否 - STRING - 默认会随机生成一个5400~6400的值。 - 该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。 - 该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。详情请参见Server ID使用。 - scan.incremental.snapshot.enabled - 是否开启增量快照。 - 否 - BOOLEAN - true - 默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括: - 读取全量数据时,Source可以是并行读取。 
- 读取全量数据时,Source支持chunk粒度的检查点。 
- 读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。 
 - 如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。 说明- Flink计算引擎VVR 11.1及以上版本删除该配置项。 - scan.incremental.snapshot.chunk.size - 每个chunk的大小(包含的行数)。 - 否 - INTEGER - 8096 - 当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中。 - 每个chunk包含的行数越少,则表中的chunk的总数量越大,尽管这会降低故障恢复的粒度,但可能导致内存OOM和整体的吞吐量降低。因此,您需要进行权衡,并设置合理的chunk大小。 - scan.snapshot.fetch.size - 当读取表的全量数据时,每次最多拉取的记录数。 - 否 - INTEGER - 1024 - 无。 - scan.startup.mode - 消费数据时的启动模式。 - 否 - STRING - initial - 参数取值如下: - initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。 
- latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。 
- earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。 
- specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动,也可以只配置scan.startup.specific-offset.gtid-set来指定从某个GTID集合启动。 
- timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。 
 重要- 使用earliest-offset,specific-offset和timestamp启动模式时,确保在指定的Binlog消费位置到作业启动的时间之间,对应表的结构不发生变化,避免因表结构不同而报错。 - scan.startup.specific-offset.file - 使用指定位点模式启动时,启动位点的Binlog文件名。 - 否 - STRING - 无 - 使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如 - mysql-bin.000003。- scan.startup.specific-offset.pos - 使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。 - 否 - INTEGER - 无 - 使用该配置时,scan.startup.mode必须配置为specific-offset。 - scan.startup.specific-offset.gtid-set - 使用指定位点模式启动时,启动位点的GTID集合。 - 否 - STRING - 无 - 使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如 - 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。- scan.startup.timestamp-millis - 使用指定时间模式启动时,启动位点的毫秒时间戳。 - 否 - LONG - 无 - 使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。 重要- 在使用指定时间时,MySQL CDC会尝试读取每个Binlog文件的初始事件以确定其时间戳,最终定位至指定时间对应的Binlog文件。请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。 - server-time-zone - 数据库在使用的会话时区。 - 否 - STRING - 如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。 - 例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。 - debezium.min.row.count.to.stream.results - 当表的条数大于该值时,会使用分批读取模式。 - 否 - INTEGER - 1000 - Flink采用以下方式读取MySQL源表数据: - 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。 
- 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。 
 - connect.timeout - 连接MySQL数据库服务器超时时,重试连接之前等待超时的最长时间。 - 否 - DURATION - 30s - 无。 - connect.max-retries - 连接MySQL数据库服务时,连接失败后重试的最大次数。 - 否 - INTEGER - 3 - 无。 - connection.pool.size - 数据库连接池大小。 - 否 - INTEGER - 20 - 数据库连接池用于复用连接,可以降低数据库连接数量。 - jdbc.properties.* - JDBC URL中的自定义连接参数。 - 否 - STRING - 无 - 您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'。 - 支持的连接参数请参见MySQL Configuration Properties。 - debezium.* - Debezium读取Binlog的自定义参数。 - 否 - STRING - 无 - 您可以传递自定义的Debezium参数,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'来指定解析错误时的处理逻辑。 - heartbeat.interval - Source通过心跳事件推动Binlog位点前进的时间间隔。 - 否 - DURATION - 30s - 心跳事件用于推动Source中的Binlog位点前进,这对MySQL中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。 - scan.incremental.snapshot.chunk.key-column - 可以指定某一列作为快照阶段切分分片的切分列。 - 见备注列。 - STRING - 无 - 无主键表必填,选择的列必须是非空类型(NOT NULL)。 
- 有主键的表为选填,仅支持从主键中选择一列。 
 - rds.region-id - 阿里云RDS MySQL实例所在的地域ID。 - 使用读取OSS归档日志功能时必填。 - STRING - 无 - 地域ID请参见地域和可用区。 - rds.access-key-id - 阿里云RDS MySQL账号Access Key ID。 - 使用读取OSS归档日志功能时必填。 - STRING - 无 - 详情请参见如何查看AccessKey ID和AccessKey Secret信息?。 重要- 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。 - rds.access-key-secret - 阿里云RDS MySQL账号Access Key Secret。 - 使用读取OSS归档日志功能时必填。 - STRING - 无 - 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要- 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理。 - rds.db-instance-id - 阿里云RDS MySQL实例ID。 - 使用读取OSS归档日志功能时必填。 - STRING - 无 - 无。 - rds.main-db-id - 阿里云RDS MySQL实例主库编号。 - 否 - STRING - 无 - 获取主库编号详情请参见RDS MySQL日志备份。 
- 仅Flink计算引擎VVR 8.0.7及以上版本支持。 
 - rds.download.timeout - 从OSS下载单个归档日志的超时时间。 - 否 - DURATION - 60s - 无。 - rds.endpoint - 获取OSS Binlog信息的服务接入点。 - 否 - STRING - 无 - 可选值详情请参见服务接入点。 
- 仅Flink计算引擎VVR 8.0.8及以上版本支持。 
 - scan.incremental.close-idle-reader.enabled - 是否在快照结束后关闭空闲的 Reader。 - 否 - BOOLEAN - false - 仅Flink计算引擎VVR 8.0.1及以上版本支持。 
- 该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。 
 - scan.read-changelog-as-append-only.enabled - 是否将changelog数据流转换为append-only数据流。 - 否 - BOOLEAN - false - 参数取值如下: - true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成INSERT类型的消息。仅在需要保存上游表删除消息等特殊场景下开启使用。 
- false(默认):所有类型的消息都保持原样下发。 
 说明- 仅Flink计算引擎VVR 8.0.8及以上版本支持。 - scan.only.deserialize.captured.tables.changelog.enabled - 在增量阶段,是否仅对指定表的变更事件进行反序列化。 - 否 - BOOLEAN - VVR 8.x版本中默认值为false。 
- VVR 11.1及以上版本默认值为true。 
 - 参数取值如下: - true:仅对目标表的变更数据进行反序列化,加快Binlog读取速度。 
- false(默认):对所有表的变更数据进行反序列化。 
 说明- 仅Flink计算引擎VVR 8.0.7及以上版本支持。 
- 在Flink计算引擎VVR 8.0.8及以下版本使用时,参数名需要修改为debezium.scan.only.deserialize.captured.tables.changelog.enable。 
 - scan.parse.online.schema.changes.enabled - 在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。 - 否 - BOOLEAN - false - 参数取值如下: - true:解析 RDS 无锁变更 DDL 事件。 
- false(默认):不解析 RDS 无锁变更 DDL 事件。 
 - 实验性功能。建议在执行线上无锁变更前,先对Flink作业执行一次快照以便恢复。 说明- 仅Flink计算引擎VVR 11.1及以上版本支持。 - scan.incremental.snapshot.backfill.skip - 是否在快照读取阶段跳过backfill。 - 否 - BOOLEAN - false - 参数取值如下: - true:快照读取阶段跳过backfill。 
- false(默认):快照读取阶段不跳过backfill。 
 - 如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。 重要- 跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。 说明- 仅Flink计算引擎VVR 11.1及以上版本支持。 - scan.incremental.snapshot.unbounded-chunk-first.enabled - 快照读取阶段是否先分发无界的分片。 - 否 - BOOELEAN - false - 参数取值如下: - true:快照读取阶段优先分发无界的分片。 
- false(默认):快照读取阶段不优先分发无界的分片。 
 - 实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。 说明- 仅Flink计算引擎VVR 11.1及以上版本支持。 
- 维表独有 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - url - MySQL JDBC URL - 否 - STRING - 无 - URL的格式为: - jdbc:mysql://<连接地址>:<端口号>/<数据库名称>。- lookup.max-retries - 读取数据失败后,重试读取的最大次数。 - 否 - INTEGER - 3 - 仅Flink计算引擎VVR 6.0.7及以上版本支持。 - lookup.cache.strategy - 缓存策略。 - 否 - STRING - None - 支持None、LRU和ALL三种缓存策略,取值含义详情请参见背景信息。 说明- 使用LRU缓存策略时,还必须配置lookup.cache.max-rows参数。 - lookup.cache.max-rows - 最大缓存条数。 - 否 - INTEGER - 100000 - 当选择LRU缓存策略后,必须设置缓存大小。 
- 当选择ALL缓存策略后,可以不设置缓存大小。 
 - lookup.cache.ttl - 缓存超时时间。 - 否 - DURATION - 10 s - lookup.cache.ttl的配置和lookup.cache.strategy有关,详情如下: - 如果lookup.cache.strategy配置为None,则lookup.cache.ttl可以不配置,表示缓存不超时。 
- 如果lookup.cache.strategy配置为LRU,则lookup.cache.ttl为缓存超时时间。默认不过期。 
- 如果lookup.cache.strategy配置为ALL,则lookup.cache.ttl为缓存加载时间。默认不重新加载。 
 - 填写时请使用时间格式,例如1min或10s。 - lookup.max-join-rows - 主表中每一条数据查询维表时,匹配后最多返回的结果数。 - 否 - INTEGER - 1024 - 无。 - lookup.filter-push-down.enabled - 是否开启维表Filter下推。 - 否 - BOOLEAN - false - 参数取值如下: - true:开启维表Filter下推,在加载MySQL数据库表的数据时,维表会根据SQL作业中设置的条件提前过滤数据。 
- false(默认):不开启维表Filter下推,在加载MySQL数据库表的数据时,维表会加载全量数据。 
 说明- 仅Flink计算引擎VVR 8.0.7及以上版本支持。 重要- 维表下推应该仅在Flink表用作维表时开启。MySQL源表暂不支持开启Filter下推,如果一张Flink表同时被作为源表和维表,且维表开启了Filter下推,则在使用源表时需要通过SQL Hints的方式将该配置项显式设为false,否则可能导致作业运行异常。 
- 结果表独有 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - url - MySQL JDBC URL - 否 - STRING - 无 - URL的格式为: - jdbc:mysql://<连接地址>:<端口号>/<数据库名称>。- sink.max-retries - 写入数据失败后,重试写入的最大次数。 - 否 - INTEGER - 3 - 无。 - sink.buffer-flush.batch-size - 一次批量写入的条数。 - 否 - INTEGER - 4096 - 无。 - sink.buffer-flush.max-rows - 内存中缓存的数据条数。 - 否 - INTEGER - 10000 - 需指定主键后,该参数才生效。 - sink.buffer-flush.interval - 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 - 否 - DURATION - 1s - 无。 - sink.ignore-delete - 是否忽略数据Delete操作。 - 否 - BOOLEAN - false - 当 Flink SQL 生成的流中包含删除或更新前记录时,若多个输出任务同时更新同一张表的不同字段,可能导致数据不一致。 - 例如:一条记录被删除后,另一个任务仅更新部分字段,未更新字段将变为 null 或默认值,造成数据错误。 - 通过设置sink.ignore-delete为true,可忽略上游的DELETE和 UPDATE_BEFORE操作,避免此类问题。 说明- UPDATE_BEFORE 是Flink的回撤机制的一部分,用于在更新操作中“撤回”旧值。 
- 当ignoreDelete = true 时,会跳过所有 DELETE和UPDATE_BEFORE 类型的记录,仅处理 INSERT和UPDATE_AFTER。 
 - sink.ignore-null-when-update - 更新数据时,如果传入的数据字段值为null,是更新对应字段为null,还是跳过该字段的更新。 - 否 - BOOLEAN - false - 参数取值如下: - true:不更新该字段。但是当Flink表设置主键时,才支持配置该参数为true。配置为true时: - 如果是8.0.6及以下的版本,结果表写入数据不支持攒批执行。 
- 如果是8.0.7及以上的版本,结果表写入数据支持攒批执行。 - 攒批写入虽然可以明显增强写入效率和整体吞吐量,但是会带来数据延迟问题和内存溢出风险。因此请您根据实际业务场景做好权衡。 
 
- false:更新该字段为null。 
 说明- 仅实时计算引擎VVR 8.0.5及以上版本支持该参数。 
类型映射
- CDC源表 - MySQL CDC字段类型 - Flink字段类型 - TINYINT - TINYINT - SMALLINT - SMALLINT - TINYINT UNSIGNED - TINYINT UNSIGNED ZEROFILL - INT - INT - MEDIUMINT - SMALLINT UNSIGNED - SMALLINT UNSIGNED ZEROFILL - BIGINT - BIGINT - INT UNSIGNED - INT UNSIGNED ZEROFILL - MEDIUMINT UNSIGNED - MEDIUMINT UNSIGNED ZEROFILL - BIGINT UNSIGNED - DECIMAL(20, 0) - BIGINT UNSIGNED ZEROFILL - SERIAL - FLOAT [UNSIGNED] [ZEROFILL] - FLOAT - DOUBLE [UNSIGNED] [ZEROFILL] - DOUBLE - DOUBLE PRECISION [UNSIGNED] [ZEROFILL] - REAL [UNSIGNED] [ZEROFILL] - NUMERIC(p, s) [UNSIGNED] [ZEROFILL] - DECIMAL(p, s) - DECIMAL(p, s) [UNSIGNED] [ZEROFILL] - BOOLEAN - BOOLEAN - TINYINT(1) - DATE - DATE - TIME [(p)] - TIME [(p)] [WITHOUT TIME ZONE] - DATETIME [(p)] - TIMESTAMP [(p)] [WITHOUT TIME ZONE] - TIMESTAMP [(p)] - TIMESTAMP [(p)] - TIMESTAMP [(p)] WITH LOCAL TIME ZONE - CHAR(n) - STRING - VARCHAR(n) - TEXT - BINARY - BYTES - VARBINARY - BLOB 重要- 建议MySQL不要使用TINYINT(1)类型存储0和1以外的数值,当property-version=0时,默认MySQL CDC源表会将TINYINT(1)映射到Flink的BOOLEAN上,造成数据不准确。如果需要使用TINYINT(1)类型存储0和1以外的数值,请参见配置参数catalog.table.treat-tinyint1-as-boolean。 
- 维表和结果表 - MySQL字段类型 - Flink字段类型 - TINYINT - TINYINT - SMALLINT - SMALLINT - TINYINT UNSIGNED - INT - INT - MEDIUMINT - SMALLINT UNSIGNED - BIGINT - BIGINT - INT UNSIGNED - BIGINT UNSIGNED - DECIMAL(20, 0) - FLOAT - FLOAT - DOUBLE - DOUBLE - DOUBLE PRECISION - NUMERIC(p, s) - DECIMAL(p, s) 说明- 其中p <= 38。 - DECIMAL(p, s) - BOOLEAN - BOOLEAN - TINYINT(1) - DATE - DATE - TIME [(p)] - TIME [(p)] [WITHOUT TIME ZONE] - DATETIME [(p)] - TIMESTAMP [(p)] [WITHOUT TIME ZONE] - TIMESTAMP [(p)] - CHAR(n) - CHAR(n) - VARCHAR(n) - VARCHAR(n) - BIT(n) - BINARY(⌈n/8⌉) - BINARY(n) - BINARY(n) - VARBINARY(N) - VARBINARY(N) - TINYTEXT - STRING - TEXT - MEDIUMTEXT - LONGTEXT - TINYBLOB - BYTES 重要- Flink仅支持小于等于2,147,483,647(2^31 - 1)的MySQL BLOB类型的记录。 - BLOB - MEDIUMBLOB - LONGBLOB 
数据摄入
MySQL连接器作为数据源可以在数据摄入YAML作业中使用。
语法结构
source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404
sink:
  type: xxx配置项
| 参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 | 
| type | 数据源类型。 | 是 | STRING | 无 | 固定值为mysql。 | 
| name | 数据源名称。 | 否 | STRING | 无 | 无。 | 
| hostname | MySQL数据库的IP地址或者Hostname。 | 是 | STRING | 无 | 建议填写专有网络VPC地址。 说明  如果MySQL与实时Flink版不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见空间管理与操作和Flink全托管集群如何访问公网?。 | 
| username | MySQL数据库服务的用户名。 | 是 | STRING | 无 | 无。 | 
| password | MySQL数据库服务的密码。 | 是 | STRING | 无 | 无。 | 
| tables | 需要同步的MySQL数据表。 | 是 | STRING | 无 | 
 说明  
 | 
| tables.exclude | 需要在同步的表中排除的表。 | 否 | STRING | 无 | 
 说明  点号用于分割数据库名和表名,如果需要用点号匹配任意字符,需要对点号使用反斜杠进行转译。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 | 
| port | MySQL数据库服务的端口号。 | 否 | INTEGER | 3306 | 无。 | 
| schema-change.enabled | 是否发送Schame变更事件。 | 否 | BOOLEAN | true | 无。 | 
| server-id | 数据库客户端的用于同步的数字ID或范围。 | 否 | STRING | 默认会随机生成一个5400~6400的值。 | 该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。 该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。 | 
| jdbc.properties.* | JDBC URL中的自定义连接参数。 | 否 | STRING | 无 | 您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'。 支持的连接参数请参见MySQL Configuration Properties。 | 
| debezium.* | Debezium读取Binlog的自定义参数。 | 否 | STRING | 无 | 您可以传递自定义的Debezium参数,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'来指定解析错误时的处理逻辑。 | 
| scan.incremental.snapshot.chunk.size | 每个chunk的大小(包含的行数)。 | 否 | INTEGER | 8096 | MySQL表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中。 每个chunk包含的行数越少,则表中的chunk的总数量越大,尽管这会降低故障恢复的粒度,但可能导致内存OOM和整体的吞吐量降低。因此,您需要进行权衡,并设置合理的chunk大小。 | 
| scan.snapshot.fetch.size | 当读取表的全量数据时,每次最多拉取的记录数。 | 否 | INTEGER | 1024 | 无。 | 
| scan.startup.mode | 消费数据时的启动模式。 | 否 | STRING | initial | 参数取值如下: 
 重要  对于earliest-offset,specific-offset和timestamp启动模式,如果启动时刻和指定的启动位点时刻的表结构不同,作业会因为表结构不同而报错。换一句话说,使用这三种启动模式,需要保证在指定的Binlog消费位置到作业启动的时间之间,对应表不能发生表结构变更。 | 
| scan.startup.specific-offset.file | 使用指定位点模式启动时,启动位点的Binlog文件名。 | 否 | STRING | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如 | 
| scan.startup.specific-offset.pos | 使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。 | 否 | INTEGER | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 | 
| scan.startup.specific-offset.gtid-set | 使用指定位点模式启动时,启动位点的GTID集合。 | 否 | STRING | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如 | 
| scan.startup.timestamp-millis | 使用指定时间模式启动时,启动位点的毫秒时间戳。 | 否 | LONG | 无 | 使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。 重要  在使用指定时间时,MySQL CDC会尝试读取每个Binlog文件的初始事件以确定其时间戳,最终定位至指定时间对应的Binlog文件。请保证指定的时间戳对应的Binlog文件在数据库中没有被清理且可以被读取到。 | 
| server-time-zone | 数据库在使用的会话时区。 | 否 | STRING | 如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。 | 例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。 | 
| scan.startup.specific-offset.skip-events | 从指定的位点读取时,跳过多少Binlog事件。 | 否 | INTEGER | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 | 
| scan.startup.specific-offset.skip-rows | 从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。 | 否 | INTEGER | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 | 
| connect.timeout | 连接MySQL数据库服务器超时时,重试连接之前等待超时的最长时间。 | 否 | DURATION | 30s | 无。 | 
| connect.max-retries | 连接MySQL数据库服务时,连接失败后重试的最大次数。 | 否 | INTEGER | 3 | 无。 | 
| connection.pool.size | 数据库连接池大小。 | 否 | INTEGER | 20 | 数据库连接池用于复用连接,可以降低数据库连接数量。 | 
| heartbeat.interval | Source通过心跳事件推动Binlog位点前进的时间间隔。 | 否 | DURATION | 30s | 心跳事件用于推动Source中的Binlog位点前进,这对MySQL中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。 | 
| scan.incremental.snapshot.chunk.key-column | 可以指定某一列作为快照阶段切分分片的切分列。 | 否。 | STRING | 无 | 仅支持从主键中选择一列。 | 
| rds.region-id | 阿里云RDS MySQL实例所在的地域ID。 | 使用读取OSS归档日志功能时必填。 | STRING | 无 | 地域ID请参见地域和可用区。 | 
| rds.access-key-id | 阿里云RDS MySQL账号Access Key ID。 | 使用读取OSS归档日志功能时必填。 | STRING | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要  为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。 | 
| rds.access-key-secret | 阿里云RDS MySQL账号Access Key Secret。 | 使用读取OSS归档日志功能时必填。 | STRING | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要  为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理。 | 
| rds.db-instance-id | 阿里云RDS MySQL实例ID。 | 使用读取OSS归档日志功能时必填。 | STRING | 无 | 无。 | 
| rds.main-db-id | 阿里云RDS MySQL实例主库编号。 | 否 | STRING | 无 | 获取主库编号详情请参见RDS MySQL日志备份。 | 
| rds.download.timeout | 从OSS下载单个归档日志的超时时间。 | 否 | DURATION | 60s | 无。 | 
| rds.endpoint | 获取OSS Binlog信息的服务接入点。 | 否 | STRING | 无 | 可选值详情请参见服务接入点。 | 
| rds.binlog-directory-prefix | 保存Binlog文件的目录前缀。 | 否 | STRING | rds-binlog- | 无。 | 
| rds.use-intranet-link | 是否使用内网下载Binlog文件。 | 否 | BOOLEAN | true | 无。 | 
| rds.binlog-directories-parent-path | 保存Binlog文件的父目录的绝对路径。 | 否 | STRING | 无 | 无。 | 
| chunk-meta.group.size | chunk元信息的大小。 | 否 | INTEGER | 1000 | 如果元信息大于该值,元信息会分为多份传递。 | 
| chunk-key.even-distribution.factor.lower-bound | 是否可以均匀分片的chunk分布因子的下限。 | 否 | DOUBLE | 0.05 | 分布因子小于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 | 
| chunk-key.even-distribution.factor.upper-bound | 是否可以均匀分片的chunk分布因子的上限。 | 否 | DOUBLE | 1000.0 | 分布因子大于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 | 
| scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的Reader。 | 否 | BOOLEAN | false | 该配置生效,需要设置 | 
| scan.only.deserialize.captured.tables.changelog.enabled | 在增量阶段,是否仅对指定表的变更事件进行反序列化。 | 否 | BOOLEAN | 
 | 参数取值如下: 
 | 
| scan.parallel-deserialize-changelog.enabled | 在增量阶段,是否使用多线程对变更事件进行解析。 | 否 | BOOLEAN | false | 参数取值如下: 
 说明  仅Flink计算引擎VVR 8.0.11及以上版本支持。 | 
| scan.parallel-deserialize-changelog.handler.size | 多线程对变更事件进行解析时,事件处理器的数量。 | 否 | INTEGER | 2 | 说明  仅Flink计算引擎VVR 8.0.11及以上版本支持。 | 
| metadata-column.include-list | 需要传给下游的元数据列。 | 否 | STRING | 无 | 可用的元数据包括 说明  MySQL CDC YAML连接器无需也不支持添加 重要  
 | 
| scan.newly-added-table.enabled | 从Checkpoint重启时,是否同步上一次启动时未匹配到的新增表或者移除状态中保存的当前不匹配的表。 | 否 | BOOLEAN | false | 从Checkpoint或Savepoint重启时生效。 | 
| scan.binlog.newly-added-table.enabled | 在增量阶段,是否发送匹配到的新增表的数据。 | 否 | BOOLEAN | false | 不能与 | 
| scan.incremental.snapshot.chunk.key-column | 为某些表指定一列作为快照阶段切分分片的切分列。 | 否 | STRING | 无 | 
 | 
| scan.parse.online.schema.changes.enabled | 在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。 | 否 | BOOLEAN | false | 参数取值如下: 
 实验性功能。建议在执行线上无锁变更前,先对Flink作业执行一次快照以便恢复。 说明  仅Flink计算引擎VVR 11.0及以上版本支持。 | 
| scan.incremental.snapshot.backfill.skip | 是否在快照读取阶段跳过backfill。 | 否 | BOOLEAN | false | 参数取值如下: 
 如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。 重要  跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。 说明  仅Flink计算引擎VVR 11.1及以上版本支持。 | 
| treat-tinyint1-as-boolean.enabled | 是否将TINYINT(1)类型当做Boolean类型处理。 | 否 | BOOLEAN | true | 参数取值如下: 
 | 
| treat-timestamp-as-datetime-enabled | 是否将TIMESTAMP类型当作DATETIME类型处理。 | 否 | BOOLEAN | false | 参数取值如下: 
 MySQL TIMESTAMP类型存储的是UTC时间,受时区影响,MySQL DATETIME类型存储的是字面时间,不受时区影响。 开启后会根据server-time-zone将MySQL TIMESTAMP类型数据转换成DATETIME类型。 | 
| include-comments.enabled | 是否同步表注释和字段注释。 | 否 | BOOELEAN | false | 参数取值如下: 
 开启后会增加作业内存使用量。 | 
| scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照读取阶段是否先分发无界的分片。 | 否 | BOOELEAN | false | 参数取值如下: 
 实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。 说明  仅Flink计算引擎VVR 11.1及以上版本支持。 | 
类型映射
数据摄入类型映射如下表所示。
| MySQL CDC字段类型 | CDC字段类型 | 
| TINYINT(n) | TINYINT | 
| SMALLINT | SMALLINT | 
| TINYINT UNSIGNED | |
| TINYINT UNSIGNED ZEROFILL | |
| YEAR | |
| INT | INT | 
| MEDIUMINT | |
| MEDIUMINT UNSIGNED | |
| MEDIUMINT UNSIGNED ZEROFILL | |
| SMALLINT UNSIGNED | |
| SMALLINT UNSIGNED ZEROFILL | |
| BIGINT | BIGINT | 
| INT UNSIGNED | |
| INT UNSIGNED ZEROFILL | |
| BIGINT UNSIGNED | DECIMAL(20, 0) | 
| BIGINT UNSIGNED ZEROFILL | |
| SERIAL | |
| FLOAT [UNSIGNED] [ZEROFILL] | FLOAT | 
| DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE | 
| DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
| REAL [UNSIGNED] [ZEROFILL] | |
| NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | DECIMAL(p, s) | 
| DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | |
| FIXED(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | |
| BOOLEAN | BOOLEAN | 
| BIT(1) | |
| TINYINT(1) | |
| DATE | DATE | 
| TIME [(p)] | TIME [(p)] | 
| DATETIME [(p)] | TIMESTAMP [(p)] | 
| TIMESTAMP [(p)] | 根据 
 
 | 
| CHAR(n) | CHAR(n) | 
| VARCHAR(n) | VARCHAR(n) | 
| BIT(n) | BINARY(⌈(n + 7) / 8⌉) | 
| BINARY(n) | BINARY(n) | 
| VARBINARY(N) | VARBINARY(N) | 
| NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | STRING 说明  在MySQL中,十进制数据类型的精度高达 65,但在Flink中,十进制数据类型的精度仅限于38。所以,如果定义精度大于38的十进制列,则应将其映射到字符串以避免精度损失。 | 
| DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | |
| FIXED(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | |
| TINYTEXT | STRING | 
| TEXT | |
| MEDIUMTEXT | |
| LONGTEXT | |
| ENUM | |
| JSON | STRING 说明  JSON数据类型将在Flink中转换为JSON格式的字符串。 | 
| GEOMETRY | STRING 说明  MySQL中的空间数据类型将转换为具有固定JSON格式的字符串,详情请参见MySQL空间数据类型映射。 | 
| POINT | |
| LINESTRING | |
| POLYGON | |
| MULTIPOINT | |
| MULTILINESTRING | |
| MULTIPOLYGON | |
| GEOMETRYCOLLECTION | |
| TINYBLOB | BYTES 说明  对于MySQL中的BLOB数据类型,仅支持长度不大于2147483647(2**31-1)的 blob。 | 
| BLOB | |
| MEDIUMBLOB | |
| LONGBLOB | 
使用示例
- CDC源表 - CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;
- 维表 - CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
- 结果表 - CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;
- 数据摄入数据源 - source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
关于MySQL CDC源表
- 实现原理 - MySQL CDC源表在启动时扫描全表,将表按照主键分成多个分片(chunk),记录下此时的Binlog位点。并使用增量快照算法通过select语句,逐个读取每个分片的数据。作业会周期性执行Checkpoint,记录下已经完成的分片。当发生Failover时,只需要继续读取未完成的分片。当分片全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。 - 更详细的增量快照算法,请参见MySQL CDC Connector。 
- 元数据 - 元数据在分库分表合并同步场景非常实用,因为分库分表合并后,一般业务还是希望区分每条数据的库名和表名来源,而元数据列可以访问源表的库名和表名信息。因此通过元数据列可以非常方便地将多张分表合并到一张目的表。 - MySQL CDC Source支持元数据列语法,您可以通过元数据列访问以下元数据。 - 元数据key - 元数据类型 - 描述 - database_name - STRING NOT NULL - 包含该行记录的库名。 - table_name - STRING NOT NULL - 包含该行记录的表名。 - op_ts - TIMESTAMP_LTZ(3) NOT NULL - 该行记录在数据库中的变更时间,如果该记录来自表的存量历史数据而不是Binlog中获取,则该值总是0。 - op_type - STRING NOT NULL - 该行记录的变更类型。 - +I:表示INSERT消息 
- -D:表示DELETE消息 
- -U:表示UPDATE_BEFORE消息 
- +U:表示UPDATE_AFTER消息 
 说明- 仅实时计算引擎VVR 8.0.7及以上版本支持。 - 将MySQL实例中多个分库下的多张orders表,合并同步到下游Hologres的holo_orders表中,代码示例如下所示。 - CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- 读取库名。 table_name STRING METADATA FROM 'table_name' VIRTUAL, -- 读取表名。 operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 读取变更时间。 op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 读取变更类型。 order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- 正则匹配多个分库。 'table-name' = 'orders_.*' -- 正则匹配多张分表。 ); INSERT INTO holo_orders SELECT * FROM mysql_orders;- 在上面代码的基础上,WITH参数里配置scan.read-changelog-as-append-only.enabled参数为true时,输出结果根据下游表主键设置情况有不同的表现: - 下游表主键为order_id时,输出结果仅包含上游表每个主键的最后一次变更。即对于某个主键最后一次变更为删除操作的数据,在下游表可以看到一条相同主键的、op_type为-D的数据。 
- 下游表主键为order_id、operation_ts、op_type时,输出结果包含上游表每个主键的完整变更。 
 
- 支持正则表达式 - MySQL CDC源表支持在表名或者库名中使用正则表达式匹配多个表或者多个库。通过正则表达式指定多张表的代码示例如下。 - CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正则表达式匹配多个库。 'table-name' = '(t[5-8]|tt)' -- 正则表达式匹配多张表。 );- 上述例子中的正则表达式解释: - ^(test).* 是前缀匹配示例,这个表达式可以匹配以test开头的库名,例如test1或test2。 
- .*[p$] 是后缀匹配示例, 这个表达式可以匹配以p结尾的库名,例如cdcp或edcp。 
- txc是指定匹配, 可以匹配指定名称的数据库名,例如txc。 
 - MySQL CDC在匹配全路径表名时,会通过库名和表名来唯一确定一张表,即使用database-name.table-name作为匹配表的模式,例如匹配模式 (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) 就可以匹配到数据库中的表txc.tt和test2.test5。 重要- 在SQL作业的配置中,table-name和database-name不支持使用逗号(,)分隔形式指定多张表或多个库。 - 如果需要匹配多个表或使用多个正则表达式,可以用竖线(|)连接并用小括号包围,例如需要读取表user和product,table-name可以配置为 - (user|product)。
- 如果正则表达式包含逗号,需要用竖线(|)运算符进行改写,例如正则表达式 - mytable_\d{1, 2}需要改写成等价的- (mytable_\d{1}|mytable_\d{2}),来避免使用逗号。
 
- 并发控制 - MySQL连接器支持多并发读取全量数据,能够提高数据加载效率。同时配合Flink实时计算控制台的Autopilot自动调优功能,在多并发读取完成后增量阶段,能够自动缩容,节约计算资源。 - 在实时计算开发控制台,您可以在资源配置页面的基础模式或专家模式中设置作业的并发数。设置并发的区别如下: - 基础模式设置的并发数为整个作业的全局并发数。  
- 专家模式支持按需为某个VERTEX设置并发数。  
 - 资源配置详情请参见配置作业部署信息。 重要- 无论是基础模式还是专家模式,在设置并发时,表中声明的server-id范围必须大于等于作业的并发数。例如server-id的范围为5404-5412,则共有8个唯一的server-id,因此作业最多可以设置8个并发,且不同的作业对于同一个MySQL实例的server-id范围不能有重叠,即每个作业需显式配置不同的server-id。 
- Autopilot自动缩容 - 全量阶段积累了大量历史数据,为了提高读取效率,通常采用并发的方式读取历史数据。而在Binlog增量阶段,因为Binlog数据量少且为了保证全局有序,通常只需要单并发读取。全量阶段和增量阶段对资源的不同需求,可以通过自动调优功能自动帮您实现性能和资源的平衡。 - 自动调优会监控MySQL CDC Source的每个task的流量。当进入Binlog阶段,如果只有一个task在负责Binlog读取,其他task均空闲时,自动调优便会自动缩小Source的CU数和并发。开启自动调优只需要在作业运维页面,将自动调优的模式设置为Active模式。 说明- 默认调低并发度的最小触发时间间隔为24小时。更多自动调优的参数和细节,请参见配置自动调优。 
- 启动模式 - 使用配置项scan.startup.mode可以指定MySQL CDC源表的启动模式。可选值包括: - initial (默认):在第一次启动时对数据库表进行全量读取,完成后切换至增量模式读取Binlog。 
- earliest-offset:跳过快照阶段,从可读取的最早Binlog位点开始读取。 
- latest-offset:跳过快照阶段,从Binlog的结尾处开始读取。该模式下源表只能读取在作业启动之后的数据变更。 
- specific-offset:跳过快照阶段,从指定的Binlog位点开始读取。位点可通过Binlog文件名和位置指定,或者使用GTID集合指定。 
- timestamp:跳过快照阶段,从指定的时间戳开始读取Binlog事件。 
 - 使用示例: - CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动。 'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动。 'scan.startup.mode' = 'specific-offset', -- 从特定位点启动。 'scan.startup.mode' = 'timestamp', -- 从特定位点启动。 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定Binlog文件名。 'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定Binlog位置。 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定GTID集合。 'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳。 ... )重要- MySQL source会在Checkpoint时将当前位点以INFO级别打印到日志中,日志前缀为 - Binlog offset on checkpoint {checkpoint-id},该日志可以帮助您将作业从某个Checkpoint位点开始启动作业。
- 如果读取的表曾经发生过表结构变化,从最早位点(earliest-offset)、特定位点(specific-offset)或时间戳(timestamp)启动可能会发生错误。因为Debezium读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。 
 
- 关于无主键CDC源表 - 在Flink计算引擎VVR 6.0.7及以上版本支持使用MySQL CDC无主键源表,使用无主键表要求必须设置scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的字段。 
- 无主键CDC源表的处理语义由scan.incremental.snapshot.chunk.key-column指定的列的行为决定: - 如果指定的列不存在更新操作,此时可以保证Exactly once语义。 
- 如果指定的列发生更新操作,此时只能保证At least once语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。 
 
 
- 读取阿里云RDS MySQL备份日志 - MySQL CDC源表支持读取阿里云RDS MySQL的备份日志。这在全量阶段执行时间较长,本地Binlog文件已经被自动清理,而自动或者手动上传的备份文件依然存在的场景下非常适用。 - 使用示例: - CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )
- 开启CDC Source复用 - 当同一个作业中有多个MySQL CDC源表时,每个源表都会启动对应的Binlog Client,如果源表数量较多并且读取的MySQL表都在同一个实例中时,会对数据库造成较大压力,详情请参见MySQL CDC常见问题。 - 实时计算引擎VVR 8.0.7及以上版本支持MySQL CDC Source复用,当不同的CDC源表配置项除了数据库、表名和server-id外的其他配置项均相同时,可以进行合并。开启Source复用后,实时计算引擎会尽可能将同一个作业中能够合并的MySQL CDC源表进行合并。 - 您可以在SQL作业中使用SET命令开启source复用功能: - SET 'table.optimizer.source-merge.enabled' = 'true';- 对已有作业启用 Source 复用后,需要无状态启动。原因是 Source 复用会导致作业拓扑改变,从原有作业状态可能无法启动或者丢失数据。 重要- VVR 8.0.8及8.0.9版本,在开启CDC Source复用时,还需要额外设置 - SET 'sql-gateway.exec-plan.enabled' = 'false'。
- 在开启CDC Source复用后,不建议将作业配置项 - pipeline.operator-chaining设为false,因为将算子链断开后,Source发送给下游算子的数据会增加序列化和反序列的开销,当合并的Source越多时,开销会越大。
- 在实时计算引擎VVR 8.0.7版本,将 - pipeline.operator-chaining设为false时会出现序列化的问题。
 
加速Binlog读取
MySQL连接器作为源表或数据摄入数据源使用时,在增量阶段会解析Binlog文件生成各种变更消息,Binlog文件使用二进制记录着所有表的变更,可以通过以下方式加速Binlog文件解析。
- 开启解析过滤配置 - 使用配置项 - scan.only.deserialize.captured.tables.changelog.enabled:仅对指定表的变更事件进行解析。
 
- 优化Debezium参数 - debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50- debezium.max.queue.size:阻塞队列可以容纳的记录的最大数量。当Debezium从数据库读取事件流时,它会在将事件写入下游之前将它们放入阻塞队列。默认值为8192。
- debezium.max.batch.size:该连接器每次迭代处理的事件条数最大值。默认值为2048。
- debezium.poll.interval.ms:连接器应该在请求新的变更事件前等待多少毫秒。默认值为1000毫秒,即1秒。
 
使用示例:
CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium配置
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 开启解析过滤
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 仅对指定表的变更事件进行解析。
    ...
)source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium配置
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # 开启解析过滤
  scan.only.deserialize.captured.tables.changelog.enabled: trueMySQL CDC 企业版本binlog消费能力为85MB/s,约为开源社区的2倍,当Binlog文件产生速度大于 85MB/s 时(即每6s一个512MB大小的文件),Flink 作业的延迟会持续上升,在Binlog文件产生速度降低后处理延迟会逐步下降。在Binlog文件包含大事务时,可能会导致处理延迟短暂上升,读取完该事务的日志后处理延迟会下降。
MySQL CDC DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
创建DataStream API程序并使用MySqlSource。代码及pom依赖项示例如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>在构建MySqlSource时,代码中必须指定以下参数:
| 参数 | 说明 | 
| hostname | MySQL数据库的IP地址或者Hostname。 | 
| port | MySQL数据库服务的端口号。 | 
| databaseList | MySQL数据库名称。 说明  数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 | 
| username | MySQL数据库服务的用户名。 | 
| password | MySQL数据库服务的密码。 | 
| deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下: 
 | 
pom依赖项必须指定以下参数:
| ${vvr.version} | 阿里云实时计算Flink版的引擎版本,例如: 说明  请以Maven上显示的版本号为准,因为我们会不定期发布Hotfix版本,而这些更新可能不会通过其他渠道通知。 | 
| ${flink.version} | Apache Flink版本,例如: 重要  请使用阿里云实时计算Flink版的引擎版本对应的Apache Flink版本,避免在作业运行时出现不兼容的问题。版本对应关系详情,请参见引擎。 | 
常见问题
CDC源表使用中可能遇到的问题,详情请参见CDC问题。
Flink CDC技术原理及企业版特性
- Flink CDC企业版特性 
- Flink CDC技术