Flink订阅Binlog
实时计算 Flink 版通过订阅云原生数据仓库 AnalyticDB MySQL 版,可以实时捕获和处理数据库变更数据,实现高效的数据同步和流式计算。本文为您介绍如何使用Flink订阅AnalyticDB for MySQL Binlog。
前提条件
AnalyticDB for MySQL产品系列为企业版、基础版、湖仓版和数仓版弹性模式。
AnalyticDB for MySQL集群的内核版本需为3.2.1.0及以上版本。
请在云原生数据仓库AnalyticDB MySQL控制台集群信息页面的配置信息区域,查看和升级内核版本。
Flink实时计算引擎需为VVR 8.0.4及以上版本。
AnalyticDB for MySQL集群和Flink全托管工作空间需要位于同一VPC下。
使用限制
XUANWU_V2表不支持开启Binlog,因此不能通过订阅Binlog实现AnalyticDB for MySQL集群XUANWU_V2表的数据同步和流式计算。
Flink仅支持处理AnalyticDB for MySQL Binlog中的所有基础数据类型和复杂数据类型JSON。
Flink不会处理AnalyticDB for MySQL Binlog中的DDL操作记录和分区表自动分区删除的操作记录。
步骤一:开启Binlog功能
开启Binlog功能,本文以表名为source_table为例。
AnalyticDB for MySQL仅支持按表开启Binlog功能。
建表时,开启Binlog建表后,开启BinlogCREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;
ALTER TABLE source_table BINLOG=true;
(可选)修改Binlog保留时长。
您可以通过修改
binlog_ttl
参数来调整Binlog的保留时长,参数默认值为6h。以下示例表示将表source_table的Binlog保留时长设置为1天。ALTER TABLE source_table binlog_ttl='1d';
binlog_ttl
参数取值支持以下格式:毫秒:纯数字。示例:
60
代表60毫秒。秒:数字+s。示例:
30s
代表30秒。小时:数字+h。示例:
2h
代表2小时。天:数字+d。示例:
1d
代表1天。
建议您设置的Binlog保留时间不小于
binlog_ttl
参数的默认值。若设置的保留时间过短,可能会导致文件被清理,影响数据同步。如果您需要查看当前Binlog保留时长,执行语句
SHOW CREATE TABLE source_table;
。
步骤二:上传AnalyticDB for MySQL连接器到Flink
步骤三:订阅Binlog
创建源表,连接到AnalyticDB for MySQL并读取指定表(source_table)的Binlog数据。
Flink DDL中定义的主键必须和AnalyticDB for MySQL集群物理表中的主键保持一致,主键一致包括主键和主键名称一致。如果不一致,会影响数据正确性。
Flink的数据类型需要和AnalyticDB for MySQL兼容。映射关系,请参见类型映射。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );
WITH参数说明:
参数
是否必填
默认值
数据类型
说明
参数
是否必填
默认值
数据类型
说明
connector
是
无
STRING
使用的连接器。
这里填写自定义连接器,固定填写
adb-mysql-cdc
。hostname
是
无
STRING
AnalyticDB for MySQL的VPC地址。
username
是
无
STRING
AnalyticDB for MySQL数据库账号。
password
是
无
STRING
AnalyticDB for MySQL数据库密码。
database-name
是
无
STRING
AnalyticDB for MySQL数据库名称。
由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个数据库。
table-name
是
无
STRING
AnalyticDB for MySQL数据库的表名。
由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个表。
port
否
3306
INTEGER
端口号。
scan.incremental.snapshot.enabled
否
true
BOOLEAN
增量快照。
默认开启。增量快照是一种读取表快照的新机制,与旧的快照机制相比,增量快照有许多优点,包括:
在读取快照期间,Source支持并发读取。
在读取快照期间,Source支持进行Chunk粒度的Checkpoint。
在读取快照之前,Source不需要获取数据库锁权限。
scan.incremental.snapshot.chunk.size
否
8096
INTEGER
表快照的Chunk大小(包含的行数)。
当开启增量快照读取时,表会被切分成多个Chunk读取。
scan.snapshot.fetch.size
否
1024
INTEGER
读取表快照时,每次读取数据的最大行数。
scan.startup.mode
否
initial
STRING
消费数据的启动模式。
取值如下:
initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。
specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置
scan.startup.specific-offset.file
和scan.startup.specific-offset.pos
参数来指定从特定Binlog文件名和偏移量启动。latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。
timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过
scan.startup.timestamp-millis
指定,单位为毫秒(ms)。
使用earliest-offset、specific-offset或timestamp启动模式时,请确保在指定的Binlog消费位置到作业启动期间,对应表的结构保持不变,避免因表结构变更导致作业执行失败。
scan.startup.specific-offset.file
否
无
STRING
在specific-offset启动模式下,启动位点的Binlog文件名。
最新Binlog文件名可使用
SHOW MASTER STATUS for table_name;
获取。scan.startup.specific-offset.pos
否
无
LONG
在specific-offset启动模式下,启动位点的Binlog文件位置。
最新Binlog位置可使用
SHOW MASTER STATUS for table_name;
获取。scan.startup.specific-offset.skip-events
否
无
LONG
在指定的启动位点后需要跳过的事件数量。
scan.startup.specific-offset.skip-rows
否
无
LONG
在指定的启动位点后需要跳过的数据行数。
scan.startup.timestamp-millis
否
无
LONG
使用指定时间模式启动时,启动位点的毫秒时间戳。
使用该配置时,
scan.startup.mode
必须配置为timestamp。时间戳单位为毫秒(ms)。server-time-zone
否
无
STRING
数据库服务器中的会话时区。
例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP类型如何转成STRING类型。如果没有设置,则使用
ZONELD.SYSTEMDEFAULT()
来确定服务器时区。debezium.min.row.count.to.stream.result
否
1000
INTEGER
当表的行数大于该值时,连接器会对结果进行流式处理。
若将此参数设置为
0
,会跳过所有表大小检查,始终在快照期间对所有结果进行流式处理。connect.timeout
否
30s
DURATION
连接数据库服务器超时,重试连接之前等待超时的最长时间。
默认单位为秒(s)。
connect.max-retries
否
3
INTEGER
连接数据库服务时,连接失败后重试的最大次数。
在目标端创建表,用于存储处理后的数据。本文以AnalyticDB for MySQL作为目标端。Flink支持的连接器请参见支持的连接器。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )
创建结果表,连接步骤3创建的表,用于将处理后的数据写入到AnalyticDB for MySQL指定的表。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );
结果表对应的WITH参数和映射类型的详情,请参见:云原生数据仓库AnalyticDB MySQL版(ADB)3.0。
将捕获到的源数据变化同步到结果表,并由结果表将数据同步到目标端。
INSERT INTO adb_sink SELECT * FROM adb_source;
单击保存。
单击深度检查。
深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。
(可选)单击调试。
您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。
单击部署。
(可选)查看Binlog信息。
使用以下语句查看Binlog日志信息时,若仅开启Binlog功能,日志信息显示为0。只有成功订阅Binlog后,才会显示日志信息。
若您需要获取Binlog最新写入的文件名和位置信息,请执行以下SQL语句:
SHOW MASTER STATUS FOR source_table;
若您需要了解所有未清理的历史Binlog文件及其大小,请执行以下SQL语句:
SHOW BINARY LOGS FOR source_table;
类型映射
AnalyticDB for MySQL与Flink的数据类型映射关系如下:
AnalyticDB for MySQL字段类型 | Flink字段类型 |
AnalyticDB for MySQL字段类型 | Flink字段类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s)或NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |
- 本页导读 (1)
- 前提条件
- 使用限制
- 步骤一:开启Binlog功能
- 步骤二:上传AnalyticDB for MySQL连接器到Flink
- 步骤三:订阅Binlog
- 类型映射