实时计算 Flink 版通过订阅云原生数据仓库 AnalyticDB MySQL 版,可以实时捕获和处理数据库变更数据,实现高效的数据同步和流式计算。本文为您介绍如何使用Flink订阅AnalyticDB for MySQL Binlog。
前提条件
AnalyticDB for MySQL产品系列为企业版、基础版、湖仓版或数仓版弹性模式。
AnalyticDB for MySQL集群的内核版本需为3.2.1.0及以上版本。
说明查看企业版、基础版或湖仓版集群的内核版本,请执行
SELECT adb_version();
。如需升级内核版本,请联系技术支持。查看和升级数仓版集群的内核版本,请参见查看和升级版本。
Flink实时计算引擎需为VVR 8.0.4及以上版本。
AnalyticDB for MySQL和Flink全托管工作空间需要位于同一VPC下,详情请参见创建集群和开通实时计算Flink版。
已将Flink工作空间所属的网段加入AnalyticDB for MySQL的白名单,详情请参见Flink所属网段查看方法和设置白名单。
使用限制
AnalyticDB for MySQL仅支持按表开启Binlog功能。
Flink仅支持处理AnalyticDB for MySQL Binlog中的所有基础数据类型和复杂数据类型JSON,详情请参见数据类型。
Flink不会处理AnalyticDB for MySQL Binlog中的DDL操作记录和分区表自动分区删除的操作记录。
步骤一:开启Binlog功能
开启Binlog功能,本文以表名为source_table为例。
建表时,开启Binlog
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;
建表后,开启Binlog
ALTER TABLE source_table BINLOG=true;
(可选)查看Binlog信息。
说明使用以下语句查看Binlog文件的信息时,若仅开启Binlog功能,日志信息显示为0。只有Flink成功订阅Binlog后,才会显示日志信息。
查看当前写入的Binlog的位点,SQL语句如下:
SHOW MASTER STATUS FOR source_table;
查看集群内对应表所有Binlog文件的信息,SQL语句如下:
SHOW BINARY LOGS FOR source_table;
(可选)修改Binlog保留时长。
您可以通过修改
binlog_ttl
参数来调整Binlog的保留时长。以下示例表示将表source_table的Binlog保留时长设置为1天。ALTER TABLE source_table binlog_ttl='1d';
binlog_ttl
参数取值支持以下格式:纯数字,表示毫秒。例如,60代表60毫秒。
数字+s,表示秒。例如,30s代表30秒。
数字+h,表示小时。例如,2h代表2小时。
数字+d,表示天。例如,1d代表1天。
步骤二:配置Flink连接器
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击数据连接。
在数据连接页面,单击创建自定义连接器
上传自定义连接器JAR包。下载链接:AnalyticDB for MySQL Connector。
上传完成后,单击下一步
单击完成。创建完成的自定义连接器会出现在连接器列表中。
步骤三:订阅Binlog
创建源表,连接到AnalyticDB for MySQL并读取指定表(source_table)的Binlog数据。
说明Flink DDL中定义的主键必须和AnalyticDB for MySQL集群物理表中的主键保持一致,主键一致包括主键和主键名称一致。如果不一致,会影响数据正确性。
Flink的数据类型需要和AnalyticDB for MySQL兼容。映射关系,请参见类型映射。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );
WITH参数说明:
参数
是否必填
默认值
数据类型
说明
connector
是
无
STRING
使用的连接器。
这里填写自定义连接器,固定填写
adb-mysql-cdc
。hostname
是
无
STRING
AnalyticDB for MySQL的VPC地址。
username
是
无
STRING
AnalyticDB for MySQL数据库账号。
password
是
无
STRING
AnalyticDB for MySQL数据库密码。
database-name
是
无
STRING
AnalyticDB for MySQL数据库名称。
由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个数据库。
table-name
是
无
STRING
AnalyticDB for MySQL数据库的表名。
由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个表。
port
否
3306
INTEGER
端口号。
scan.incremental.snapshot.enabled
否
true
BOOLEAN
增量快照。
默认开启。增量快照是一种读取表快照的新机制,与旧的快照机制相比,增量快照有许多优点,包括:
在读取快照期间,Source支持并发读取。
在读取快照期间,Source支持进行Chunk粒度的Checkpoint。
在读取快照之前,Source不需要获取数据库锁权限。
scan.incremental.snapshot.chunk.size
否
8096
INTEGER
表快照的Chunk大小(包含的行数)。
当开启增量快照读取时,表会被切分成多个Chunk读取。
scan.snapshot.fetch.size
否
1024
INTEGER
读取表快照时,每次读取数据的最大行数。
scan.startup.mode
否
initial
STRING
消费数据的启动模式。
取值如下:
initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。
specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置
scan.startup.specific-offset.file
和scan.startup.specific-offset.pos
参数来指定从特定Binlog文件名和偏移量启动。
scan.startup.specific-offset.file
否
无
STRING
在specific-offset启动模式下,启动位点的Binlog文件名。
最新Binlog文件名可使用
SHOW MASTER STATUES for table_name
获取。scan.startup.specific-offset.pos
否
无
LONG
在specific-offset启动模式下,启动位点的Binlog文件位置。
最新Binlog位置可使用
SHOW MASTER STATUES for table_name
获取。scan.startup.specific-offset.skip-events
否
无
LONG
在指定的启动位点后需要跳过的事件数量。
scan.startup.specific-offset.skip-rows
否
无
LONG
在指定的启动位点后需要跳过的数据行数。
server-time-zone
否
无
STRING
数据库服务器中的会话时区。
例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP类型如何转成STRING类型。如果没有设置,则使用
ZONELD.SYSTEMDEFAULT()
来确定服务器时区。debezium.min.row.count.to.stream.result
否
1000
INTEGER
当表的行数大于该值时,连接器会对结果进行流式处理。
若将此参数设置为
0
,会跳过所有表大小检查,始终在快照期间对所有结果进行流式处理。connect.timeout
否
30s
DURATION
连接数据库服务器超时,重试连接之前等待超时的最长时间。
默认单位为秒(s)。
connect.max-retries
否
3
INTEGER
连接数据库服务时,连接失败后重试的最大次数。
在目标端创建表,用于存储处理后的数据。本文以AnalyticDB for MySQL作为目标端。Flink支持的连接器请参见支持的连接器。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )
创建结果表,连接步骤3创建的表,用于将处理后的数据写入到AnalyticDB for MySQL指定的表。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );
结果表对应的WITH参数和映射类型,详情请见:云原生数据仓库AnalyticDB MySQL版(ADB)3.0。
将捕获到的源数据变化同步到结果表,并由结果表将数据同步到目标端。
INSERT INTO adb_sink SELECT * FROM adb_source;
单击保存。
单击深度检查。
深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。
(可选)单击调试。
您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。详情请参见作业调试。
单击部署,详情请参见部署SQL作业。
完成作业开发和深度检查后,即可部署作业,将数据发布至生产环境。部署后,您可以在作业运维页面启动作业至运行阶段,详情请参见作业启动。
类型映射
AnalyticDB for MySQL与Flink的数据类型映射关系如下:
AnalyticDB for MySQL字段类型 | Flink字段类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s)或NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |