Flink订阅Binlog
实时计算 Flink 版通过订阅云原生数据仓库 AnalyticDB MySQL 版,可以实时捕获和处理数据库变更数据,实现高效的数据同步和流式计算。本文为您介绍如何使用Flink订阅AnalyticDB for MySQL Binlog。
前提条件
- AnalyticDB for MySQL产品系列为企业版、基础版、湖仓版和数仓版弹性模式。 
- AnalyticDB for MySQL集群的内核版本需为3.2.1.0及以上版本。 说明- 请在云原生数据仓库AnalyticDB MySQL控制台集群信息页面的配置信息区域,查看和升级内核版本。 
- Flink实时计算引擎需为VVR 8.0.4及以上版本。 
- AnalyticDB for MySQL集群和Flink全托管工作空间需要位于同一VPC下。 
使用限制
- XUANWU_V2表不支持开启Binlog,因此不能通过订阅Binlog实现AnalyticDB for MySQL集群XUANWU_V2表的数据同步和流式计算。 
- Flink仅支持处理AnalyticDB for MySQL Binlog中的所有基础数据类型和复杂数据类型JSON。 
- Flink不会处理AnalyticDB for MySQL Binlog中的DDL操作记录和分区表自动分区删除的操作记录。 
步骤一:开启Binlog功能
- 开启Binlog功能,本文以表名为source_table为例。 说明- AnalyticDB for MySQL仅支持按表开启Binlog功能。 - 建表时,开启Binlog- CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;- 建表后,开启Binlog- ALTER TABLE source_table BINLOG=true;
- (可选)修改Binlog保留时长。 - 您可以通过修改 - binlog_ttl参数来调整Binlog的保留时长,参数默认值为6h。以下示例表示将表source_table的Binlog保留时长设置为1天。- ALTER TABLE source_table binlog_ttl='1d';- binlog_ttl参数取值支持以下格式:- 毫秒:纯数字。示例: - 60代表60毫秒。
- 秒:数字+s。示例: - 30s代表30秒。
- 小时:数字+h。示例: - 2h代表2小时。
- 天:数字+d。示例: - 1d代表1天。
 说明- 内核版本为3.2.1且为3.2.1.9及以上、3.2.2且为3.2.2.14及以上、3.2.3且为3.2.3.8及以上、3.2.4且为3.2.4.4及以上、3.2.5且为3.2.5.1及以上的集群,Binlog保留时长上限为365天。内核版本低于上述的集群,Binlog保留时长上限为21天。 
- 建议您设置的Binlog保留时间不小于 - binlog_ttl参数的默认值。若设置的保留时间过短,可能会导致文件被清理,影响数据同步。
- 如果您需要查看当前Binlog保留时长,执行语句 - SHOW CREATE TABLE source_table;。
 
步骤二:上传AnalyticDB for MySQL连接器到Flink
步骤三:订阅Binlog
- 创建源表,连接到AnalyticDB for MySQL并读取指定表(source_table)的Binlog数据。 说明- Flink DDL中定义的主键必须和AnalyticDB for MySQL集群物理表中的主键保持一致,主键一致包括主键和主键名称一致。如果不一致,会影响数据正确性。 
- Flink的数据类型需要和AnalyticDB for MySQL兼容。映射关系,请参见类型映射。 
 - CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );- WITH参数说明: - 参数 - 是否必填 - 默认值 - 数据类型 - 说明 - connector - 是 - 无 - STRING - 使用的连接器。 - 这里填写自定义连接器,固定填写 - adb-mysql-cdc。- hostname - 是 - 无 - STRING - AnalyticDB for MySQL的VPC地址。 - username - 是 - 无 - STRING - AnalyticDB for MySQL数据库账号。 - password - 是 - 无 - STRING - AnalyticDB for MySQL数据库密码。 - database-name - 是 - 无 - STRING - AnalyticDB for MySQL数据库名称。 - 由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个数据库。 - table-name - 是 - 无 - STRING - AnalyticDB for MySQL数据库的表名。 - 由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个表。 - port - 否 - 3306 - INTEGER - 端口号。 - scan.incremental.snapshot.enabled - 否 - true - BOOLEAN - 增量快照。 - 默认开启。增量快照是一种读取表快照的新机制,与旧的快照机制相比,增量快照有许多优点,包括: - 在读取快照期间,Source支持并发读取。 
- 在读取快照期间,Source支持进行Chunk粒度的Checkpoint。 
- 在读取快照之前,Source不需要获取数据库锁权限。 
 - scan.incremental.snapshot.chunk.size - 否 - 8096 - INTEGER - 表快照的Chunk大小(包含的行数)。 - 当开启增量快照读取时,表会被切分成多个Chunk读取。 - scan.snapshot.fetch.size - 否 - 1024 - INTEGER - 读取表快照时,每次读取数据的最大行数。 - scan.startup.mode - 否 - initial - STRING - 消费数据的启动模式。 - 取值如下: - initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。 
- earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。 
- specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置 - scan.startup.specific-offset.file和- scan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动。
- latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。 
- timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过 - scan.startup.timestamp-millis指定,单位为毫秒(ms)。
 重要- 使用earliest-offset、specific-offset或timestamp启动模式时,请确保在指定的Binlog消费位置到作业启动期间,对应表的结构保持不变,避免因表结构变更导致作业执行失败。 - scan.startup.specific-offset.file - 否 - 无 - STRING - 在specific-offset启动模式下,启动位点的Binlog文件名。 - 最新Binlog文件名可使用 - SHOW MASTER STATUS for table_name;获取。- scan.startup.specific-offset.pos - 否 - 无 - LONG - 在specific-offset启动模式下,启动位点的Binlog文件位置。 - 最新Binlog位置可使用 - SHOW MASTER STATUS for table_name;获取。- scan.startup.specific-offset.skip-events - 否 - 无 - LONG - 在指定的启动位点后需要跳过的事件数量。 - scan.startup.specific-offset.skip-rows - 否 - 无 - LONG - 在指定的启动位点后需要跳过的数据行数。 - scan.startup.timestamp-millis - 否 - 无 - LONG - 使用指定时间模式启动时,启动位点的毫秒时间戳。 - 使用该配置时, - scan.startup.mode必须配置为timestamp。时间戳单位为毫秒(ms)。- server-time-zone - 否 - 无 - STRING - 数据库服务器中的会话时区。 - 例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP类型如何转成STRING类型。如果没有设置,则使用 - ZONELD.SYSTEMDEFAULT()来确定服务器时区。- debezium.min.row.count.to.stream.result - 否 - 1000 - INTEGER - 当表的行数大于该值时,连接器会对结果进行流式处理。 - 若将此参数设置为 - 0,会跳过所有表大小检查,始终在快照期间对所有结果进行流式处理。- connect.timeout - 否 - 30s - DURATION - 连接数据库服务器超时,重试连接之前等待超时的最长时间。 - 默认单位为秒(s)。 - connect.max-retries - 否 - 3 - INTEGER - 连接数据库服务时,连接失败后重试的最大次数。 
- 在目标端创建表,用于存储处理后的数据。本文以AnalyticDB for MySQL作为目标端。Flink支持的连接器请参见支持的连接器。 - CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )
- 创建结果表,连接步骤3创建的表,用于将处理后的数据写入到AnalyticDB for MySQL指定的表。 - CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );- 结果表对应的WITH参数和映射类型的详情,请参见:云原生数据仓库AnalyticDB MySQL版(ADB)3.0。 
- 将捕获到的源数据变化同步到结果表,并由结果表将数据同步到目标端。 - INSERT INTO adb_sink SELECT * FROM adb_source;
- 单击保存。 
- 单击深度检查。 - 深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。 
- (可选)单击调试。 - 您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。 
- 单击部署。 
- (可选)查看Binlog信息。 说明- 使用以下语句查看Binlog日志信息时,若仅开启Binlog功能,日志信息显示为0。只有成功订阅Binlog后,才会显示日志信息。 - 若您需要获取Binlog最新写入的文件名和位置信息,请执行以下SQL语句: - SHOW MASTER STATUS FOR source_table;
- 若您需要了解所有未清理的历史Binlog文件及其大小,请执行以下SQL语句: - SHOW BINARY LOGS FOR source_table;
 
类型映射
AnalyticDB for MySQL与Flink的数据类型映射关系如下:
| AnalyticDB for MySQL字段类型 | Flink字段类型 | 
| BOOLEAN | BOOLEAN | 
| TINYINT | TINYINT | 
| SMALLINT | SMALLINT | 
| INT | INT | 
| BIGINT | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DECIMAL(p,s)或NUMERIC(p,s) | DECIMAL(p,s) | 
| VARCHAR | STRING | 
| BINARY | BYTES | 
| DATE | DATE | 
| TIME | TIME | 
| DATETIME | TIMESTAMP | 
| TIMESTAMP | TIMESTAMP | 
| POINT | STRING | 
| JSON | STRING |