Subscribe to binary logs by using Flink
Realtime Compute for Apache Flink subscribes to the AnalyticDB for MySQL of an AnalyticDB for MySQL cluster to capture and process database changes in real time. This enables efficient data synchronization and stream computing.
Prerequisites
Your AnalyticDB for MySQL cluster must be one of the following editions: Enterprise Edition, Basic Edition, Data Lakehouse Edition, or Data Warehouse Edition (in elastic mode).
The AnalyticDB for MySQL cluster must meet the following kernel version requirements:
xuanwu_v1 table engine: 3.2.1.0 or later.
xuanwu_v2 table engine: 3.2.5.0 or later.
NoteTo view and update the minor version, go to the Configuration Information section on the Cluster Information page in the AnalyticDB for MySQL console.
The Flink workspace must use Ververica Runtime (VVR) 8.0.4 or a later version.
The AnalyticDB for MySQL cluster and the fully managed Flink workspace must be in the same VPC.
Add the CIDR block of the Flink workspace to the AnalyticDB for MySQL whitelist.
Limitations
Flink can process only basic data types and the JSON complex data type from the AnalyticDB for MySQL binary log.
Flink ignores DDL operations and automatic partition deletion records for partitioned tables from the AnalyticDB for MySQL binary log.
Step 1: Enable binary logging
Enable binary logging. This example uses a table named source_table.
NoteAnalyticDB for MySQL supports enabling binary logging only at the table level.
When creating a table
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;For an existing table
ALTER TABLE source_table BINLOG=true;(Optional) Modify the binary log retention period.
You can modify the
binlog_ttlparameter to adjust the binary log retention period. The default value of this parameter is 6 hours. For example, you can set the binary log retention period for the source_table table to 1 day.ALTER TABLE source_table binlog_ttl='1d';The
binlog_ttlparameter supports the following formats:Milliseconds: A numeric value. Example:
60represents 60 milliseconds.Seconds: Number + s. Example:
30srepresents 30 seconds.Hour: A number followed by h. For example,
2hrepresents 2 hours.Day: A number followed by
d. Example:1drepresents 1 day.
NoteThe maximum binary log retention period is 365 days for clusters with the following kernel versions or later versions in their respective series: 3.2.1.9, 3.2.2.14, 3.2.3.8, 3.2.4.4, and 3.2.5.1. For clusters with earlier kernel versions, the maximum retention period is 21 days.
We recommend that you set the binary log retention period to a value no less than the default value of the
binlog_ttlparameter. If the retention period is too short, files may be purged, which can affect data synchronization.If you need to view the current binary log retention period, run
SHOW CREATE TABLE source_table;.
Step 2: Upload the AnalyticDB for MySQL connector
Download the connector JAR file: flink-sql-connector-adb-mysql-cdc-2.4-20260420.jar.
Log on to the Realtime Compute for Apache Flink console.
On the Flink tab, find the target workspace and click Console in the Actions column.
In the left-side navigation pane, click Connectors.
On the Connectors page, click Create Custom Connector.
Upload the downloaded connector and click Next.
Click Finish. The created custom connector appears in the list of connectors.
Step 3: Subscribe to the binary log
Log on to the Realtime Compute for Apache Flink console and create an SQL job.
Create a source table that connects to AnalyticDB for MySQL and reads binary log data from a specific table (source_table).
NoteThe primary key defined in the Flink DDL must match the primary key in the physical table in the AnalyticDB for MySQL cluster. This includes both the key columns and the primary key name. Otherwise, the data may be incorrect.
The Flink data types must be compatible with those in AnalyticDB for MySQL. For details about the data type mappings, see Type mapping.
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );The following table describes the WITH parameters.
Parameter
Required
Default
Type
Description
connector
Yes
None
STRING
The connector to use.
This is a custom connector. Set this parameter to
adb-mysql-cdc.hostname
Yes
None
STRING
The AnalyticDB for MySQL endpoint of the AnalyticDB for MySQL cluster.
username
Yes
None
STRING
The database account for the AnalyticDB for MySQL cluster.
password
Yes
None
STRING
The password for the AnalyticDB for MySQL database account.
database-name
Yes
None
STRING
The name of the AnalyticDB for MySQL database.
Because AnalyticDB for MySQL implements table-level binary logging, you can specify only one database.
table-name
Yes
None
STRING
The name of the table in the AnalyticDB for MySQL database.
Because AnalyticDB for MySQL implements table-level binary logging, you can specify only one table.
port
No
3306
INTEGER
The port number.
scan.incremental.snapshot.enabled
No
true
BOOLEAN
Specifies whether to enable the incremental snapshot reading mechanism.
This feature is enabled by default. An incremental snapshot is a new mechanism for reading table snapshots. Compared with the previous snapshot mechanism, incremental snapshots have the following benefits:
The source supports concurrent reading during snapshot reading.
The source supports chunk-level checkpoints during snapshot reading.
The source does not need to obtain database lock permissions before reading a snapshot.
scan.incremental.snapshot.chunk.size
No
8096
INTEGER
The number of rows per chunk for a table snapshot.
scan.snapshot.fetch.size
No
1024
INTEGER
The maximum number of rows to fetch at a time when reading a table snapshot.
scan.startup.mode
No
initial
STRING
The startup mode for data consumption.
Valid values:
initial (default): Performs an initial snapshot of the table and then reads the latest binary log.
earliest-offset: Skips the snapshot phase and starts reading from the earliest available binary log.
specific-offset: Skips the snapshot phase and starts from a specified binary log position. Specify the binary log filename and offset by setting both the
scan.startup.specific-offset.fileandscan.startup.specific-offset.posparameters.latest-offset: Skips the snapshot phase and reads only changes that occur after the connector starts.
timestamp: Skips the snapshot phase and starts reading from a specified timestamp. Set the timestamp in milliseconds (ms) by using the
scan.startup.timestamp-millisparameter.
ImportantIf you use the earliest-offset, specific-offset, or timestamp startup mode, ensure that the table schema remains unchanged between the specified starting position and the job startup. Otherwise, the job may fail due to schema changes.
scan.startup.specific-offset.file
No
None
STRING
In specific-offset startup mode, the binary log filename at the startup position.
To obtain the latest binary log filename, run the
SHOW MASTER STATUS for table_name;statement.scan.startup.specific-offset.pos
No
None
LONG
In specific-offset startup mode, the position in the binary log file at the startup position.
To obtain the latest binary log position, run the
SHOW MASTER STATUS for table_name;statement.scan.startup.specific-offset.skip-events
No
None
LONG
The number of events to skip after the specified startup position.
scan.startup.specific-offset.skip-rows
No
None
LONG
The number of data rows to skip after the specified startup position.
scan.startup.timestamp-millis
No
None
LONG
The millisecond timestamp of the startup position when using the timestamp startup mode.
When you use this parameter, you must set
scan.startup.modeto timestamp. The timestamp is in milliseconds (ms).server-time-zone
No
System default
STRING
The session time zone on the database server, such as "Asia/Shanghai".
It controls how AnalyticDB for MySQLAnalyticDB for MySQL is converted to the
STRINGdata type. If this parameter is not set,ZONELD.SYSTEMDEFAULT()is used to determine the server time zone.debezium.min.row.count.to.stream.result
No
1000
INTEGER
If the number of rows in a table is greater than this value, the connector streams the results.
If you set this parameter to
0, all table size checks are skipped, and all results are streamed during the snapshot phase.connect.timeout
No
30s
DURATION
The maximum time to wait for a database connection before the attempt times out.
The default unit is seconds (s).
connect.max-retries
No
3
INTEGER
The maximum number of retries after a database connection failure.
Create a physical table in the destination database to store the processed data. This topic uses AnalyticDB for MySQL as the destination. For connectors supported by Flink, see Supported connectors.
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )Create a sink table that connects to the table created in the previous step. The sink table writes processed data to the specified table in AnalyticDB for MySQL.
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );For more information about the WITH parameters and type mappings for the sink table, see AnalyticDB for MySQL V3.0 connector.
Use an INSERT INTO statement to send data from the source table to the sink table.
INSERT INTO adb_sink SELECT * FROM adb_source;Click Save.
Click Validate.
The validation feature checks the SQL semantics, network connectivity, and metadata of the tables that are used by the job. You can also click SQL Advice in the result area to view SQL risk prompts and optimization suggestions.
(Optional) Click Debug.
You can use the job debugging feature to simulate job runs, check output results, verify the business logic of SELECT or INSERT statements, improve development efficiency, and reduce data quality risks.
Click Deploy.
After you develop and validate the job, deploy it to the production environment. Then, go to the O&M page and start the job.
(Optional) View binary log information.
NoteThe following statements return 0 if you have enabled binary logging but have not yet subscribed using DTS. Binary log information appears only after a successful subscription is established.
To obtain the file name and position of the latest binary log entry, run the following SQL statement:
SHOW MASTER STATUS FOR source_table;To view all unpurged binary log files and their sizes, run the following SQL statement:
SHOW BINARY LOGS FOR source_table;
Type mapping
The following table maps AnalyticDB for MySQL data types to their Flink equivalents.
AnalyticDB for MySQL type | Flink type |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s) or NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |