The MySQL connector reads change data capture (CDC) streams and writes data to MySQL-compatible databases, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.
Overview
The MySQL connector supports all MySQL-protocol-compatible databases: ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.
To read data from OceanBase, enable and correctly configure the OceanBase binlog (Binlog-related operations). This feature is in public preview. Evaluate its performance and stability before production use.
The following table summarizes the connector capabilities.
|
Category |
Description |
|
Supported types |
source table, dimension table, sink table, and data ingestion source |
|
Execution mode |
Streaming only |
|
Data format |
Not applicable |
|
Specific metrics |
|
|
API types |
DataStream, SQL, and data ingestion YAML |
|
Update/delete sink table data |
Supported |
Features
The MySQL CDC source reads a full database snapshot, then seamlessly switches to binlog for incremental updates with Exactly-Once semantics. It supports concurrent snapshot reads, lockless operation, and checkpoint-based recovery through an incremental snapshot algorithm. Understanding MySQL source.
-
Unified batch and streaming: Reads snapshot and incremental data in a single pipeline.
-
Horizontally scalable: Concurrent snapshot reads for horizontal scaling.
-
Automated resource optimization: Automatically scales down after switching from snapshot to incremental reading.
-
Enhanced stability: Checkpoint-based recovery during the snapshot phase enables resumption after interruptions.
-
Lock-free snapshots: Reads snapshot data without locks, avoiding impact on online workloads.
-
Cloud integration: Supports reading from the binlog of ApsaraDB RDS for MySQL instances.
-
Low-latency reads: Parallel binlog parsing minimizes read latency.
Prerequisites
Complete the steps in Configure a MySQL database before using a MySQL CDC source table.
RDS for MySQL
-
Run a network probe to ensure network connectivity with Realtime Compute for Apache Flink.
-
MySQL version: 5.6, 5.7, or 8.0.x.
-
Enable the
binlog(enabled by default). -
Set the binlog format to
ROW(default). -
Set
binlog_row_imagetoFULL(default). -
Disable binary log transaction compression (introduced in MySQL 8.0.20 and later, and disabled by default).
-
Create a MySQL user and grant the
SELECT,SHOW DATABASES,REPLICATION SLAVE, andREPLICATION CLIENTpermissions. -
Create a MySQL database and tables (Create a database and account for ApsaraDB RDS for MySQL). Use a high-privilege account to avoid permission failures.
-
Configure an IP whitelist (Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance).
PolarDB for MySQL
-
Run a network probe to ensure network connectivity with Realtime Compute for Apache Flink.
-
MySQL version: 5.6, 5.7, or 8.0.x.
-
Enable the
binlog(disabled by default). -
Set the binlog format to
ROW(default). -
Set
binlog_row_imagetoFULL(default). -
Disable binary log transaction compression (introduced in MySQL 8.0.20 and later, and disabled by default).
-
Create a MySQL user and grant the
SELECT,SHOW DATABASES,REPLICATION SLAVE, andREPLICATION CLIENTpermissions. -
Create a MySQL database and tables (Create a database and account for PolarDB for MySQL). Use a high-privilege account to avoid permission failures.
-
Configure an IP whitelist (Configure an IP address whitelist for PolarDB for MySQL).
Self-managed MySQL
-
Run a network probe to ensure network connectivity with Realtime Compute for Apache Flink.
-
MySQL version: 5.6, 5.7, or 8.0.x.
-
Enable the
binlog(disabled by default). -
Set the binlog format to
ROW(the default isSTATEMENT). -
Set
binlog_row_imagetoFULL(default). -
Disable binary log transaction compression (introduced in MySQL 8.0.20 and later, and disabled by default).
-
Create a MySQL user and grant the
SELECT,SHOW DATABASES,REPLICATION SLAVE, andREPLICATION CLIENTpermissions. -
Create a MySQL database and tables (Create a database and account for a self-managed MySQL database). Use a high-privilege account to avoid permission failures.
-
Configure an IP whitelist (Add a security group rule).
Limitations
General
-
MySQL CDC source tables do not support defining watermarks. To perform windowed aggregation, use non-windowed aggregation instead. For more information, see How can I perform windowed aggregation if watermarks are not supported?.
-
In CTAS and CDAS jobs, MySQL CDC source tables support only partial schema evolution. Supported change types: Schema evolution synchronization policy.
-
The MySQL CDC connector does not currently support binary log transaction compression. When using the MySQL CDC connector to consume incremental data, disable this feature. Otherwise, the connector may fail to retrieve incremental data.
ApsaraDB RDS for MySQL
-
For ApsaraDB RDS for MySQL, avoid reading data from a standby or read-only instance. These instances typically have short retention periods for the binary log. If the logs expire before being processed, the job may fail.
-
ApsaraDB RDS for MySQL enables parallel replication by default, which does not guarantee a consistent transaction order between the primary and secondary instances. This can cause data loss when the job recovers from a checkpoint after a primary-secondary switchover. To avoid this issue, enable the slave_preserve_commit_order option.
PolarDB for MySQL
MySQL CDC source tables do not support reading data from a multi-master cluster of PolarDB for MySQL version 1.0.19 or earlier (What is a multi-master cluster?). The binary logs from these cluster versions may contain duplicate table IDs, which can cause schema mapping errors, resulting in data parsing failures.
Open-source MySQL
By default, MySQL preserves transaction order during binary log replication. However, if a MySQL replica has parallel replication enabled (slave_parallel_workers > 1) but slave_preserve_commit_order is not set to ON, its transaction commit order can differ from that of the primary instance. This discrepancy can lead to data loss when a Flink CDC job resumes from a checkpoint. Set slave_preserve_commit_order = ON on the MySQL replica. Alternatively, set slave_parallel_workers = 1, although this may reduce replication performance.
Usage notes
-
Source table
-
-
During the full load phase, do not add or remove source tables after creating a savepoint. Restarting the job from that savepoint will then cause data reading errors.
-
-
Sink table
-
Do not declare an auto-increment primary key in the DDL. MySQL automatically populates this field when writing data.
-
You must declare at least one non-primary key field. Otherwise, an error occurs.
-
The
NOT ENFORCEDconstraint in the DDL indicates that Flink does not validate the primary key. You must ensure primary key correctness and integrity. Validity Check.
-
-
Dimension table
To use an index to accelerate queries, ensure that the order of the fields in the
JOINcondition matches the order of the columns in the index definition. This adheres to the leftmost prefix rule. For example, if an index is defined on(a, b, c), theJOINcondition should beON t.a = x AND t.b = y.The Flink optimizer may rewrite the generated SQL, preventing the query from using the intended index. To confirm that the index is used, check the execution plan (
EXPLAIN) or the slow query log in MySQL to inspect the actualSELECTstatement.
SQL
Use the MySQL connector in SQL jobs as a source, dimension, or sink table.
Syntax
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
-
The connector writes to a target table by converting each incoming record into an SQL statement. The specific SQL statement executed depends on the table schema:
-
For a target table without a primary key, the connector executes an
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);statement. -
For a target table with a primary key, the connector performs an upsert by executing an
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;statement. Please note: If the physical table has a unique index other than the primary key, inserting records with different primary keys but identical unique index values can cause a conflict, leading to data being overwritten.
-
-
If an auto-increment primary key is defined in the MySQL database, do not declare the auto-increment column in your Flink DDL. The database automatically populates this column during the write operation. The connector supports INSERT and DELETE operations for tables with an auto-increment column, but does not support UPDATE operations.
WITH parameters
-
General
Parameter
Description
Required
Type
Default
Notes
connector
The connector type.
Yes
STRING
-
For a source table, the value can be
mysql-cdcormysql. These two values are equivalent. For a dimension table or sink table, the value must bemysql.hostname
The IP address or hostname of the MySQL database.
Yes
STRING
-
A VPC address is recommended.
NoteIf your MySQL database and Realtime Compute for Apache Flink workspace are in different VPCs, establish a cross-VPC network connection or connect over the public network. Space Management and Operations and How can a fully managed Flink cluster access the public network?.
username
The username for the MySQL database.
Yes
STRING
-
-
password
The password for the MySQL database.
Yes
STRING
-
-
database-name
The database name.
Yes
STRING
-
-
When used as a source, you can specify a regular expression to read data from multiple databases.
-
When using regular expressions, avoid the ^ and $ symbols. See the notes for table-name.
table-name
The table name.
Yes
STRING
-
-
When used as a source, you can specify a regular expression to read data from multiple tables.
To improve performance when reading from multiple MySQL tables, submit multiple CTAS statements as a single job to avoid multiple binlog listeners. Run multiple CTAS statements in a single job.
-
When you use regular expressions, avoid using the ^ and $ symbols to match the start and end. See the following description for the specific reasons.
NoteWhen a MySQL CDC source table matches table names by using a regular expression, it concatenates the database-name and table-name that you enter with the string \\. (for VVR versions before 8.0.1, the character . is used) to form a full-path regular expression, and then uses this expression to match the fully qualified names of tables in the MySQL database.
For example, when you configure 'database-name'='db_.*' and 'table-name'='tb_.+', the connector will use the regular expression db_.*\\.tb_.+ (or db_.*.tb_.+ for versions earlier than 8.0.1) to match fully qualified table names to determine which tables to read.
port
The port of the MySQL database.
No
INTEGER
3306
-
-
-
Source-specific
Parameter
Description
Required
Type
Default
Remarks
server-id
A numeric ID for the database client.
No
STRING
Random integer between 5400 and 6400.
The server ID must be globally unique within the MySQL cluster. Assign a different ID for each job connecting to the same database.
Also supports an ID range (e.g.
5400-5408). When incremental snapshot is enabled with concurrent readers, specify an ID range with at least as many IDs as the source parallelism. Server ID Usage.scan.incremental.snapshot.enabled
Enables or disables incremental snapshot.
No
BOOLEAN
true
Incremental snapshot is enabled by default. Compared to the legacy snapshot mechanism, it offers:
-
Parallel snapshot reads.
-
Chunk-level checkpointing during the snapshot phase.
-
No global read lock (
FLUSH TABLES WITH READ LOCK) required.
To enable parallel reading, each concurrent reader requires a unique server ID. Therefore, the
server-idmust be set to a range, such as5400-6400, and the size of the range must be greater than or equal to the source parallelism.NoteThis parameter is removed in Flink engine VVR 11.1 and later.
scan.incremental.snapshot.chunk.size
The number of rows in each chunk.
No
INTEGER
8096
With incremental snapshot enabled, the table is split into multiple chunks. Each chunk is buffered in memory until fully read.
Smaller chunks enable more granular fault recovery but can cause OOM issues and lower throughput.
scan.snapshot.fetch.size
The maximum number of rows to fetch per batch during a snapshot read.
No
INTEGER
1024
-
scan.startup.mode
The startup mode for the connector.
No
STRING
initial
Valid values include:
-
initial (default): On the first startup, the connector performs a snapshot of the existing data and then continues to read the latest Binlog data.
-
latest-offset: On the first startup, the connector skips the snapshot and starts reading from the end of the Binlog. It reads only changes that occur after the connector starts.
-
earliest-offset: The connector skips the snapshot and starts reading from the earliest available Binlog offset.
-
specific-offset: The connector skips the snapshot and starts from a user-specified Binlog offset. You can specify the offset by setting both scan.startup.specific-offset.file and scan.startup.specific-offset.pos, or by setting only scan.startup.specific-offset.gtid-set to start from a specific GTID set.
-
timestamp: The connector skips the snapshot and starts reading the Binlog from a specified timestamp. The timestamp is specified in milliseconds by the scan.startup.timestamp-millis parameter.
ImportantWhen you use the earliest-offset, specific-offset, or timestamp startup mode, ensure that the table schema does not change between the specified starting offset and the time the job is started. Schema changes can cause errors.
scan.startup.specific-offset.file
The Binlog filename for the starting offset.
No
STRING
-
When you use this configuration, scan.startup.mode must be set to specific-offset. The file name is in a format such as
mysql-bin.000003.scan.startup.specific-offset.pos
The starting offset within the specified Binlog file.
No
INTEGER
-
When you use this configuration, scan.startup.mode must be set to specific-offset.
scan.startup.specific-offset.gtid-set
The GTID set for the starting offset.
No
STRING
-
When you use this configuration, scan.startup.mode must be set to specific-offset. The GTID set is in a format such as
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
The starting timestamp in milliseconds.
No
LONG
-
When you use this configuration, scan.startup.mode must be set to timestamp. The unit of the timestamp is milliseconds.
ImportantWhen starting from a specified timestamp, the MySQL CDC connector scans the initial event of each Binlog file to locate the file corresponding to the timestamp. Ensure that the Binlog file for the given timestamp has not been purged and is accessible on the database server.
server-time-zone
The session time zone of the database server.
No
STRING
If this parameter is not specified, the system uses the time zone of the Flink job's runtime environment, which is the time zone of the selected availability zone.
Example:
Asia/Shanghai. Controls howTIMESTAMPvalues are converted to strings. Debezium temporal values.debezium.min.row.count.to.stream.results
When the number of rows in a table exceeds this value, the connector uses batch read instead of snapshot read.
No
INTEGER
1000
Flink reads MySQL source tables in one of the following ways:
-
Snapshot read: Loads the entire table into memory. This method is fast but memory-intensive and can cause out of memory (OOM) errors for large tables.
-
Batch read: Reads the table in smaller batches. This method is memory-efficient but can be slower than a snapshot read.
connect.timeout
The timeout for a connection attempt to the MySQL server. If the attempt times out, the connector retries.
No
DURATION
30s
-
connect.max-retries
The maximum number of times to retry a failed connection to the MySQL server.
No
INTEGER
3
-
connection.pool.size
The size of the database connection pool.
No
INTEGER
20
The connection pool reuses database connections to reduce the overall connection count.
jdbc.properties.*
Custom parameters appended to the JDBC URL.
No
STRING
-
You can pass custom connection parameters. For example, to disable SSL, set 'jdbc.properties.useSSL' = 'false'.
For a list of supported parameters, see MySQL Configuration Properties.
debezium.*
Custom Debezium parameters for reading the Binlog.
No
STRING
-
You can pass custom Debezium parameters. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling mode for deserialization failures.
WarningDo not modify Debezium parameters arbitrarily, as this can cause data reading errors. For example, the
debezium.binlog.buffer.sizeparameter must not be configured.heartbeat.interval
The interval for emitting heartbeat events to advance the Binlog offset.
No
DURATION
30s
Heartbeat events advance the Binlog offset. This is especially useful for slowly updated tables where the Binlog offset might not advance automatically. This prevents the Binlog offset from becoming outdated, which could cause the job to fail and require a stateless restart.
scan.incremental.snapshot.chunk.key-column
Specifies a column to use for splitting chunks during the snapshot phase.
See the Remarks column.
STRING
-
-
Required for tables without a primary key. The selected column must be of a non-null type (NOT NULL).
-
Optional for tables with a primary key. You can select only one column from the primary key.
rds.region-id
The region ID of the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
-
For a list of region IDs, see Regions and zones.
ImportantMySQL CDC generates random GTID strings, which are not monotonically increasing like Binlog file offsets. To locate a specific file by its GTID, the connector must download and parse all archived logs from OSS, a process that is resource-intensive and time-consuming. Therefore, reading from archived OSS logs is not feasible with GTID-based startup modes. This feature only supports starting from a specific timestamp or a Binlog file offset. It also does not support scenarios involving primary-secondary switchovers within the archived logs, as these rely on GTIDs. Evaluate these limitations carefully before using this feature.
rds.access-key-id
The AccessKey ID for the ApsaraDB RDS for MySQL account.
Required when reading archived logs from OSS.
STRING
-
How do I view my AccessKey ID and AccessKey secret?.
ImportantSecurity Note: To avoid exposing your credentials, use Variable Management to store and retrieve your AccessKey ID.
rds.access-key-secret
The AccessKey secret for the ApsaraDB RDS for MySQL account.
Required when reading archived logs from OSS.
STRING
-
How do I view my AccessKey ID and AccessKey secret?
ImportantSecurity Note: To avoid exposing your credentials, use Variable Management to store and retrieve your AccessKey secret.
rds.db-instance-id
The instance ID of the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
-
-
rds.main-db-id
The primary database ID of the ApsaraDB RDS for MySQL instance.
No
STRING
-
-
To obtain the primary database ID, see ApsaraDB RDS for MySQL log backup.
-
Supported in Flink engine VVR 8.0.7 and later.
NoteIf this parameter is not specified, Flink engine VVR 11.7 and later automatically queries the primary database ID based on the ApsaraDB RDS for MySQL connection information.
rds.download.timeout
The timeout for downloading a single archived log file from OSS.
No
DURATION
60s
-
rds.endpoint
The service endpoint for retrieving Binlog information from OSS.
No
STRING
-
-
For a list of available values, see Service endpoints.
-
Supported in Flink engine VVR 8.0.8 and later.
scan.incremental.close-idle-reader.enabled
Specifies whether to close an idle reader after the snapshot phase is complete.
No
BOOLEAN
false
-
Supported in Flink engine VVR 8.0.1 and later.
-
For this setting to take effect, you must also set
execution.checkpointing.checkpoints-after-tasks-finish.enabledtotrue.
scan.read-changelog-as-append-only.enabled
Specifies whether to convert the changelog stream to an append-only stream.
No
BOOLEAN
false
Valid values:
-
true: All message types (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) are converted to INSERT messages. Enable this only for specific use cases, such as preserving deleted messages from an upstream table.
-
false (default): All message types are emitted as-is.
NoteSupported in Flink engine VVR 8.0.8 and later.
scan.only.deserialize.captured.tables.changelog.enabled
During the incremental phase, specifies whether to deserialize change events only for the captured tables.
No
BOOLEAN
-
falsefor Flink engine VVR 8.x versions. -
truefor Flink engine VVR 11.1 and later.
Valid values:
-
true: Deserializes change data only for the captured tables, which improves Binlog reading speed.
-
false: Deserializes change data for all tables.
Note-
Supported in Flink engine VVR 8.0.7 and later.
-
In Flink engine VVR 8.0.8 and earlier versions, this parameter is named debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
During the incremental phase, specifies whether to parse DDL events from RDS lockless change.
No
BOOLEAN
false
Valid values:
-
true: Parses DDL events from RDS lockless change.
-
false (default): Does not parse DDL events from RDS lockless change.
This is an experimental feature. Take a savepoint of your Flink job before you perform online lockless changes to ensure you can recover if needed.
NoteSupported in Flink engine VVR 11.1 and later.
scan.incremental.snapshot.backfill.skip
Specifies whether to skip the backfill process during the snapshot phase.
No
BOOLEAN
false
Valid values:
-
true: Skips the backfill process.
-
false (default): Does not skip the backfill process.
If backfill is skipped, changes to the table that occur during the snapshot phase are read later in the incremental phase, instead of being merged into the snapshot.
ImportantSkipping backfill can lead to data inconsistencies because changes that occur during the snapshot phase might be replayed. This provides only at-least-once semantics.
NoteSupported in Flink engine VVR 11.1 and later.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Specifies whether to distribute the unbounded chunk first during the snapshot phase.
No
BOOLEAN
false
Valid values:
-
true: The unbounded chunk is prioritized for distribution during the snapshot phase.
-
false (default): The unbounded chunk is not prioritized.
This is an experimental feature. Enabling it can reduce the risk of out of memory (OOM) errors on the TaskManager when it synchronizes the final chunk. Enable this feature before the job starts for the first time.
NoteSupported in Flink engine VVR 11.1 and later.
binlog.session.network.timeout
The network read and write timeout for the Binlog connection.
No
DURATION
10m
If set to
0s, the connection uses the default timeout of the MySQL server.NoteSupported in Flink engine VVR 11.5 and later.
scan.rate-limit.records-per-second
Limits the maximum number of records that the source can emit per second.
No
LONG
-
Use this parameter to limit the data read rate. This limit applies to both the snapshot and incremental phases.
The
numRecordsOutPerSecondmetric of the Source reflects the number of records output per second for the entire data stream. You can adjust this parameter based on this metric.During the snapshot read phase, you typically need to reduce the number of data rows read in each batch by reducing the value of the
scan.incremental.snapshot.chunk.sizeparameter.NoteSupported in Flink engine VVR 11.5 and later.
scan.binlog.tolerate.gtid-holes
Enables the job to ignore gaps in the GTID sequence, allowing it to bypass discontinuous events and continue running.
No
BOOLEAN
false
Before enabling this parameter, ensure that the startup offset of the job has not expired. If the job starts from a purged or expired GTID offset, the engine silently skips the missing logs, causing data loss.
NoteSupported in Flink engine VVR 11.6 and later.
-
-
Dimension table parameters
Parameter
Description
Required
Type
Default
Notes
url
The JDBC URL for the MySQL database.
No
STRING
None
The URL format is:
jdbc:mysql://<endpoint>:<port>/<database name>.lookup.max-retries
The maximum number of retries if a lookup request fails.
No
INTEGER
3
Supported only by Flink compute engine VVR 6.0.7 and later.
lookup.cache.strategy
The caching policy.
No
STRING
None
Supported values: None, LRU, and ALL. Background information.
NoteThe LRU caching policy requires the lookup.cache.max-rows parameter.
lookup.cache.max-rows
The maximum number of rows to store in the cache.
No
INTEGER
100000
-
Required if the caching policy is set to LRU.
-
Optional if the caching policy is set to ALL.
lookup.cache.ttl
The time-to-live (TTL) for cached entries.
No
DURATION
None
The behavior of the
lookup.cache.ttlparameter depends on the value oflookup.cache.strategy:-
If lookup.cache.strategy is set to None, lookup.cache.ttl does not need to be configured, which means the cache does not expire.
-
If lookup.cache.strategy is set to LRU, lookup.cache.ttl specifies the cache timeout duration. By default, the cache does not expire.
-
If lookup.cache.strategy is set to ALL, lookup.cache.ttl specifies the cache reload interval. By default, the cache is not reloaded.
Specify the duration in a time format, such as
1minor10s.lookup.max-join-rows
The maximum number of matching rows to return from the dimension table for each row in the source table.
No
INTEGER
1024
None
lookup.filter-push-down.enabled
Specifies whether to enable filter pushdown for the dimension table.
No
BOOLEAN
false
Valid values:
-
true: Enables filter pushdown. The system filters data from the dimension table based on the conditions in the SQL job before loading it. -
false(Default): Disables filter pushdown. The system loads the full dataset from the dimension table.
NoteSupported only by Flink compute engine VVR 8.0.7 and later.
ImportantUse filter pushdown only for Flink dimension tables. This feature is not supported for MySQL source tables. If you use the same Flink table as both a source and a dimension table with filter pushdown enabled, you must use SQL Hints to set this parameter to
falsewhen using it as a source table. Otherwise, the job may fail. -
-
Specific to sink tables
Parameter
Description
Required
Type
Default
Notes
url
The JDBC URL for the MySQL database.
No
STRING
None
The format of the URL is:
jdbc:mysql://<endpoint>:<port>/<database name>.sink.max-retries
The maximum number of retries for a failed write operation.
No
INTEGER
3
None
sink.buffer-flush.batch-size
The number of records to include in each batch write.
No
INTEGER
4096
None
sink.buffer-flush.max-rows
The maximum number of records to buffer in memory before flushing.
No
INTEGER
10000
This parameter takes effect only when a primary key is specified.
sink.buffer-flush.interval
The flush interval. The system automatically flushes the buffer when this interval is reached, even if other flush conditions are not met.
No
DURATION
1s
None
sink.ignore-delete
Whether to ignore DELETE operations.
No
BOOLEAN
false
When a Flink SQL job generates a stream that includes retractions (DELETE or UPDATE_BEFORE messages), concurrent updates to the same table by multiple sink tasks can cause data inconsistencies.
For example, if one task deletes a record while another task updates only a few of its fields, the non-updated fields might be incorrectly set to null or their default values.
To prevent this issue, set
sink.ignore-deletetotrueto ignore upstreamDELETEandUPDATE_BEFOREoperations.Note-
UPDATE_BEFOREis part of Flink's retraction mechanism, which retracts the old value during an update operation. -
When
sink.ignore-deleteistrue, the sink skips allDELETEandUPDATE_BEFORErecords, processing onlyINSERTandUPDATE_AFTERrecords.
sink.ignore-null-when-update
How to handle null values in update records.
No
BOOLEAN
false
Valid values:
-
true: Ignores null values, leaving the corresponding field in the destination table unchanged. This setting requires a primary key to be defined for the Flink table. When set totrue:-
For VVR 8.0.6 and earlier, batch write is not supported for the sink table.
-
For VVR 8.0.7 and later, batch write is supported for the sink table.
While a batch write can significantly improve throughput, it may introduce data latency and increase the risk of out-of-memory errors. Evaluate these trade-offs based on your business requirements.
-
-
false: Updates the corresponding field to null.
NoteThis parameter is supported in VVR versions 8.0.5 and later.
-
Type mapping
-
CDC source table
MySQL CDC type
Flink type
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
ImportantAvoid using the MySQL
TINYINT(1)type to store values other than 0 and 1. By default, when property-version is0, the MySQL CDC source table mapsTINYINT(1)to the FlinkBOOLEANtype, which can lead to data inaccuracies. To use theTINYINT(1)type to store other numeric values, see the catalog.table.treat-tinyint1-as-boolean configuration parameter. -
Dimension and result tables
MySQL type
Flink type
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Notepmust be 38 or less.DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
ImportantFlink supports MySQL BLOB records up to 2,147,483,647 bytes (2^31 - 1).
BLOB
MEDIUMBLOB
LONGBLOB
Data ingestion
You can use the MySQL connector as a data source for Flink CDC.
Syntax
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxx
Parameters
|
Parameter |
Description |
Required |
Type |
Default |
Notes |
|
type |
Data source type. |
Yes |
STRING |
None |
The value must be |
|
name |
Name of the data source. |
No |
STRING |
None |
- |
|
hostname |
The hostname or IP address of the MySQL database. |
Yes |
STRING |
None |
We recommend using a Virtual Private Cloud (VPC) address. Note
If your MySQL database and Realtime Compute for Apache Flink workspace are in different VPCs, you must connect them via a cross-VPC network or use public network access. For more information, see Workspace management and operations and How do I access the public network from a fully managed Flink cluster?. |
|
username |
The username for the MySQL database. |
Yes |
STRING |
None |
- |
|
password |
The password for the MySQL database. |
Yes |
STRING |
None |
- |
|
tables |
The MySQL tables to be synchronized. |
Yes |
STRING |
None |
Note
|
|
tables.exclude |
The tables to exclude from synchronization. |
No |
STRING |
None |
Note
The dot character separates the database and table names. To match a literal dot, you must escape it with a backslash. Examples: |
|
port |
The port of the MySQL database. |
No |
INTEGER |
3306 |
- |
|
schema-change.enabled |
Whether to send schema change events. |
No |
BOOLEAN |
true |
- |
|
server-id |
A unique numeric ID or range for the client connection to the MySQL server. |
No |
STRING |
A random integer between 5400 and 6400. |
The ID must be unique across all clients in the MySQL cluster. We recommend assigning a different ID to each job that connects to the same database. This parameter also supports an ID range, such as |
|
jdbc.properties.* |
Custom connection parameters for the JDBC URL. |
No |
STRING |
None |
You can pass custom connection parameters. For example, you can set For a list of supported parameters, see MySQL Configuration Properties. |
|
debezium.* |
Custom parameters for the Debezium connector that reads the binlog. |
No |
STRING |
None |
For example, set Warning
Do not modify Debezium parameters arbitrarily to avoid data read errors. For example, configuring the |
|
scan.incremental.snapshot.chunk.size |
The number of rows per chunk. |
No |
INTEGER |
8096 |
Tables are split into chunks for reading. Each chunk is buffered in memory until it is fully read. A smaller chunk size increases the total number of chunks. Although this provides finer-grained fault recovery, it may lead to out-of-memory (OOM) errors and reduce overall throughput. You must balance these trade-offs and set a reasonable chunk size. |
|
scan.snapshot.fetch.size |
The maximum number of records to fetch per batch when reading full data from a table. |
No |
INTEGER |
1024 |
- |
|
scan.startup.mode |
The data consumption startup mode. |
No |
STRING |
initial |
Valid values:
Important
For the earliest-offset, specific-offset, and timestamp startup modes, if the table schema at the time the job starts is different from the schema at the specified startup offset, the job reports an error. In other words, when you use these three startup modes, you must ensure that the schema of the corresponding table does not change between the specified Binlog consumption offset and the time the job starts. |
|
scan.startup.specific-offset.file |
The binlog filename from which to start when |
No |
STRING |
None |
When you use this configuration, scan.startup.mode must be set to specific-offset. The file name format is, for example, |
|
scan.startup.specific-offset.pos |
The binlog file offset from which to start when |
No |
INTEGER |
None |
When you use this configuration, scan.startup.mode must be set to specific-offset. |
|
scan.startup.specific-offset.gtid-set |
The GTID set from which to start when |
No |
STRING |
None |
For this configuration, scan.startup.mode must be set to specific-offset. A GTID set is in a format such as |
|
scan.startup.timestamp-millis |
The timestamp in milliseconds from which to start when |
No |
LONG |
None |
When you use this configuration, scan.startup.mode must be set to timestamp. The timestamp is specified in milliseconds. Important
When using a specified timestamp, the MySQL CDC connector locates the starting binlog file by checking the timestamp of the initial event in each file. Ensure that the binlog file corresponding to the specified timestamp has not been purged by the database and is accessible. |
|
server-time-zone |
The session time zone of the database. |
No |
STRING |
Defaults to the time zone of the Flink job's runtime environment, which is typically the time zone of the selected deployment zone. |
Example: |
|
scan.startup.specific-offset.skip-events |
The number of binlog events to skip when starting from a specific offset. |
No |
INTEGER |
None |
This parameter is valid only when scan.startup.mode is set to specific-offset. |
|
scan.startup.specific-offset.skip-rows |
The number of row changes to skip when starting from a specific offset. A single binlog event can contain multiple row changes. |
No |
INTEGER |
None |
This parameter is valid only when scan.startup.mode is set to specific-offset. |
|
connect.timeout |
The maximum time to wait for a connection to the MySQL server before timing out. |
No |
DURATION |
30s |
- |
|
connect.max-retries |
The maximum number of retries after a failed connection to the MySQL server. |
No |
INTEGER |
3 |
- |
|
connection.pool.size |
The size of the database connection pool. |
No |
INTEGER |
20 |
A connection pool is used to reuse connections, which can reduce the number of connections to the database. |
|
heartbeat.interval |
The interval for emitting heartbeat events from the source to advance the binlog offset. |
No |
DURATION |
30s |
For slowly updated tables where the binlog offset may not advance automatically, heartbeat events ensure the offset progresses. This prevents the offset from expiring. An expired binlog offset can cause the job to fail and require a stateless restart. |
|
rds.region-id |
The region ID of the Alibaba Cloud RDS for MySQL instance. |
Required when reading archived logs from OSS. |
STRING |
None |
For a list of region IDs, see Regions and zones. Important
MySQL CDC GTID strings are randomly generated and are not monotonically increasing like binlog file offsets. Locating a specific GTID in archived logs requires downloading and parsing all log files from OSS, which is highly inefficient and costly. Therefore, reading archived logs from OSS only supports starting from a specific timestamp or binlog offset. This feature does not support starting from a GTID set or scenarios with primary-secondary switchovers in the log history, as switchovers rely on GTIDs. Evaluate these limitations carefully before using this feature. |
|
rds.access-key-id |
The AccessKey ID for the Alibaba Cloud RDS for MySQL account. |
Required when reading archived logs from OSS. |
STRING |
None |
View AccessKey ID and AccessKey Secret. Important
To avoid exposing your credentials, use Manage variables to provide the AccessKey ID. |
|
rds.access-key-secret |
The AccessKey Secret for the Alibaba Cloud RDS for MySQL account. |
Required when reading archived logs from OSS. |
STRING |
None |
View AccessKey ID and AccessKey Secret. Important
To avoid exposing your credentials, use Manage variables to provide the AccessKey Secret. |
|
rds.db-instance-id |
The instance ID of the Alibaba Cloud RDS for MySQL instance. |
Required when reading archived logs from OSS. |
STRING |
None |
- |
|
rds.main-db-id |
The primary database ID of the Alibaba Cloud RDS for MySQL instance. |
No |
STRING |
None |
To obtain the primary database ID, see RDS for MySQL log backup. Note
If this parameter is not specified, Flink engine VVR 11.7 and later automatically queries the primary database ID based on the ApsaraDB RDS for MySQL connection information. |
|
rds.download.timeout |
The timeout for downloading a single archived log file from OSS. |
No |
DURATION |
60s |
- |
|
rds.endpoint |
The service endpoint for retrieving binlog information from OSS. |
No |
STRING |
None |
For a list of available endpoints, see Endpoints. |
|
rds.binlog-directory-prefix |
The directory prefix for storing binlog files. |
No |
STRING |
rds-binlog- |
- |
|
rds.use-intranet-link |
Specifies whether to use an internal network link to download binlog files. |
No |
BOOLEAN |
true |
- |
|
rds.binlog-directories-parent-path |
The absolute path to the parent directory where binlog files are stored. |
No |
STRING |
None |
- |
|
chunk-meta.group.size |
The group size of chunk metadata. |
No |
INTEGER |
1000 |
If the metadata size exceeds this value, it is transmitted in multiple parts. |
|
chunk-key.even-distribution.factor.lower-bound |
The lower bound of the distribution factor for even chunk splitting. |
No |
DOUBLE |
0.05 |
If the distribution factor is less than this value, uneven chunking is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
|
chunk-key.even-distribution.factor.upper-bound |
The upper bound of the distribution factor for even chunk splitting. |
No |
DOUBLE |
1000.0 |
If the distribution factor is greater than this value, uneven chunking is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
|
scan.incremental.close-idle-reader.enabled |
Specifies whether to close idle readers after the snapshot phase is complete. |
No |
BOOLEAN |
false |
For this configuration to take effect, set |
|
scan.only.deserialize.captured.tables.changelog.enabled |
During the incremental phase, specifies whether to deserialize change events only for the specified tables. |
No |
BOOLEAN |
|
Valid values:
|
|
scan.parallel-deserialize-changelog.enabled |
During the incremental phase, specifies whether to use multiple threads to parse change events. |
No |
BOOLEAN |
false |
Valid values:
Note
Supported in Ververica Runtime (VVR) 8.0.11 and later versions. |
|
scan.parallel-deserialize-changelog.handler.size |
The number of event handlers to use when multi-threaded deserialization is enabled. |
No |
INTEGER |
2 |
Note
Supported in Ververica Runtime (VVR) 8.0.11 and later versions. |
|
metadata-column.include-list |
A comma-separated list of metadata columns to pass downstream. |
No |
STRING |
None |
The available metadata includes Note
The MySQL CDC YAML connector does not require or support adding metadata columns for the database name, table name, or Important
|
|
scan.newly-added-table.enabled |
On restart from a checkpoint, specifies whether to synchronize newly added tables or remove tables that no longer match the specified pattern. |
No |
BOOLEAN |
false |
This parameter takes effect when restarting from a checkpoint or savepoint. Important
During the snapshot phase, you cannot add new tables to or remove tables from the source, take a savepoint, and then restart from that savepoint. This will prevent the job from reading data correctly. |
|
scan.binlog.newly-added-table.enabled |
Specifies whether to stream data from newly added tables discovered during the incremental (binlog) phase. |
No |
BOOLEAN |
false |
Cannot be enabled at the same time as |
|
scan.incremental.snapshot.chunk.key-column |
For specific tables, specifies a column to use as the key for splitting chunks during the snapshot phase. |
No |
STRING |
None |
|
|
scan.parse.online.schema.changes.enabled |
Specifies whether to parse DDL events from RDS lock-free schema changes during the incremental phase. |
No |
BOOLEAN |
false |
Valid values:
This is an experimental feature. We recommend that you take a savepoint of your Flink job before performing an online lock-free schema change to ensure you can recover if needed. Note
Supported in Ververica Runtime (VVR) 11.0 and later versions. |
|
scan.incremental.snapshot.backfill.skip |
Specifies whether to skip the backfill process during the snapshot phase. |
No |
BOOLEAN |
false |
Valid values:
If Important
Skipping backfill can lead to data inconsistencies because changes that occur during the snapshot phase might be replayed. This provides only at-least-once semantics. Note
Supported in Ververica Runtime (VVR) 11.1 and later versions. |
|
treat-tinyint1-as-boolean.enabled |
Specifies whether to treat the |
No |
BOOLEAN |
true |
Valid values:
|
|
treat-timestamp-as-datetime-enabled |
Specifies whether to treat the MySQL |
No |
BOOLEAN |
false |
Valid values:
The MySQL When enabled, this parameter converts MySQL |
|
include-comments.enabled |
Specifies whether to synchronize table and column comments. |
No |
BOOLEAN |
false |
Valid values:
Enabling this parameter increases the memory usage of the job. |
|
scan.incremental.snapshot.unbounded-chunk-first.enabled |
Specifies whether to distribute the unbounded chunk first during the snapshot phase. |
No |
BOOLEAN |
false |
Valid values:
This is an experimental feature. Enabling it can reduce the risk of out-of-memory (OOM) errors on the TaskManager when it synchronizes the final chunk. We recommend that you enable this parameter before the first job startup. Note
Supported in Ververica Runtime (VVR) 11.1 and later versions. |
|
binlog.session.network.timeout |
The network timeout for the binlog connection. |
No |
DURATION |
10m |
If set to Note
Supported in Ververica Runtime (VVR) 11.5 and later versions. |
|
scan.rate-limit.records-per-second |
The maximum number of records per second that the source can emit. |
No |
LONG |
None |
Use this to limit the data read rate. This limit applies to both the snapshot and incremental phases. The During the snapshot read phase, you usually need to reduce the number of data rows read in each batch by decreasing the value of the Note
Supported in Ververica Runtime (VVR) 11.5 and later versions. |
|
include-binlog-meta.enable |
Specifies whether to include original MySQL binlog information, such as GTIDs and binlog offsets, in the message. |
No |
BOOLEAN |
false |
This is useful for raw binlog synchronization, such as replacing an existing Canal-based synchronization pipeline. Note
Supported in Ververica Runtime (VVR) 11.6 and later versions. |
|
scan.binlog.tolerate.gtid-holes |
Allows the job to tolerate gaps in the GTID sequence by bypassing discontinuous events and continuing to run. |
No |
BOOLEAN |
false |
Before enabling this parameter, ensure the job's starting GTID offset has not expired. Starting from a purged GTID offset causes the engine to silently skip missing logs, resulting in data loss. Note
Supported in Ververica Runtime (VVR) 11.6 and later versions. |
Reuse an existing catalog
Starting with VVR 11.5, you can directly reference a built-in MySQL catalog from the Catalogs page in a Flink CDC data ingestion job, eliminating the need to manually configure connection properties.
source:
type: mysql
using.built-in-catalog: mysql_rds_catalog
Currently, data ingestion jobs automatically reuse the following parameters from the MySQL catalog:
-
hostname
-
port
-
username
-
password
-
catalog.table.metadata-columns
-
catalog.table.treat-tinyint1-as-boolean
To override any of these parameters, explicitly define them in your YAML configuration. Parameters defined in YAML take precedence.
Type mapping
The following table shows the type mappings for data ingestion.
|
MySQL CDC type |
Flink CDC type |
|
TINYINT(n) |
TINYINT |
|
SMALLINT |
SMALLINT |
|
TINYINT UNSIGNED |
|
|
TINYINT UNSIGNED ZEROFILL |
|
|
YEAR |
|
|
INT |
INT |
|
MEDIUMINT |
|
|
MEDIUMINT UNSIGNED |
|
|
MEDIUMINT UNSIGNED ZEROFILL |
|
|
SMALLINT UNSIGNED |
|
|
SMALLINT UNSIGNED ZEROFILL |
|
|
BIGINT |
BIGINT |
|
INT UNSIGNED |
|
|
INT UNSIGNED ZEROFILL |
|
|
BIGINT UNSIGNED |
DECIMAL(20, 0) |
|
BIGINT UNSIGNED ZEROFILL |
|
|
SERIAL |
|
|
FLOAT [UNSIGNED] [ZEROFILL] |
FLOAT |
|
DOUBLE [UNSIGNED] [ZEROFILL] |
DOUBLE |
|
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] |
|
|
REAL [UNSIGNED] [ZEROFILL] |
|
|
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 |
DECIMAL(p, s) |
|
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 |
|
|
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 |
|
|
BOOLEAN |
BOOLEAN |
|
BIT(1) |
|
|
TINYINT(1) |
|
|
DATE |
DATE |
|
TIME [(p)] |
TIME [(p)] |
|
DATETIME [(p)] |
TIMESTAMP [(p)] |
|
TIMESTAMP [(p)] |
The mapping depends on the value of the
|
|
CHAR(n) |
CHAR(n) |
|
VARCHAR(n) |
VARCHAR(n) |
|
BIT(n) |
BINARY(⌈(n + 7) / 8⌉) |
|
BINARY(n) |
BINARY(n) |
|
VARBINARY(N) |
VARBINARY(N) |
|
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 |
STRING Note
The MySQL decimal data type supports a precision of up to 65, while Flink's |
|
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 |
|
|
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 |
|
|
TINYTEXT |
STRING |
|
TEXT |
|
|
MEDIUMTEXT |
|
|
LONGTEXT |
|
|
ENUM |
|
|
JSON |
STRING Note
Flink converts the JSON data type to a JSON-formatted string. |
|
GEOMETRY |
STRING Note
Flink converts MySQL spatial data types to a fixed JSON string format. MySQL spatial data type mapping. |
|
POINT |
|
|
LINESTRING |
|
|
POLYGON |
|
|
MULTIPOINT |
|
|
MULTILINESTRING |
|
|
MULTIPOLYGON |
|
|
GEOMETRYCOLLECTION |
|
|
TINYBLOB |
BYTES Note
The maximum supported length for MySQL BLOB data types is 2,147,483,647 (2^31 - 1). |
|
BLOB |
|
|
MEDIUMBLOB |
|
|
LONGBLOB |
Examples
-
CDC Source Table
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source; -
Dimension Table
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a; -
Sink Table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source; -
Flink CDC Source
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
MySQL CDC source
-
How it works
When a MySQL CDC source starts, it performs a full table scan, divides the table into chunks based on the primary key, and records the current Binlog offset. It then uses the incremental snapshot algorithm to read each chunk sequentially using
SELECTstatements. The job periodically performs checkpoints to record the completed chunks. If a failover occurs, the job resumes reading the unfinished chunks. After all chunks are read, the source switches to reading incremental changes from the previously recorded Binlog offset. The Flink job continues to perform checkpoints periodically to record the Binlog offset. This ensures that if a failover occurs, the job can resume from the last recorded offset, achieving exactly-once semantics.Incremental snapshot algorithm details: MySQL CDC Connector.
-
Metadata
Metadata is useful for synchronizing and merging sharded databases and tables into a single sink. When you merge sharded tables, you often need to identify the original source database and table for each record. Metadata columns allow you to access this information, making it easy to consolidate multiple sharded tables into one sink table.
The MySQL CDC source supports metadata columns, which you can use to access the following metadata.
Metadata key
Type
Description
database_name
STRING NOT NULL
The name of the database that contains the row.
table_name
STRING NOT NULL
The name of the table that contains the row.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
The time at which the change occurred in the database. If the record is from the historical snapshot of the table instead of the Binlog, this value is
0.NoteThe precision of this field is in milliseconds.
op_type
STRING NOT NULL
The type of change operation for the row.
-
+I: INSERT message
-
-D: DELETE message
-
-U: UPDATE_BEFORE message
-
+U: UPDATE_AFTER message
NoteSupported in VVR 8.0.7 and later.
query_log
STRING NOT NULL
The MySQL query log record corresponding to the row.
NoteThe
binlog_rows_query_log_eventsparameter must be enabled in MySQL to record query logs.The following example merges multiple sharded
orderstables from multiple databases in a MySQL instance into a singleholo_orderstable in Hologres.CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Read the database name. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Read the table name. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change timestamp. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read the change type. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Match multiple sharded databases with a regular expression. 'table-name' = 'orders_.*' -- Match multiple sharded tables with a regular expression. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;If you set the
scan.read-changelog-as-append-only.enabledparameter totruein theWITHclause, the output depends on how the primary key of the sink table is configured:-
If the primary key of the sink table is
order_id, the output contains only the last change for each primary key from the source table. If the last change for a primary key was a delete operation, the sink table will contain a record with the same primary key and anop_typeof-D. -
If the primary key of the sink table is a composite key of
order_id,operation_ts, andop_type, the output contains the complete change history for each primary key from the source table.
-
-
Regular expression support
The MySQL CDC source supports using regular expressions in the
table-nameanddatabase-nameoptions to match multiple tables or databases. The following example shows how to specify multiple tables by using regular expressions.CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Match multiple databases with a regular expression. 'table-name' = '(t[5-8]|tt)' -- Match multiple tables with a regular expression. );The regular expressions in the preceding example are explained as follows:
-
^(test).*is a prefix match example. This expression matches database names that start withtest, such astest1ortest2. -
.*[p$]is a suffix match example. This expression matches database names that end withp, such ascdcporedcp. -
txcis an exact match. It matches the database namedtxc.
The MySQL CDC source uses the
database-name.table-namepattern to match fully qualified table names. For example, the pattern(^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt)can match tables such astxc.ttandtest2.test5in the database.ImportantIn SQL jobs, the
table-nameanddatabase-nameoptions do not support comma-separated lists for specifying multiple tables or databases.-
To match multiple tables or use multiple regular expressions, enclose them in parentheses and separate them with a pipe character (
|). For example, to read from theuserandproducttables, set thetable-nameoption to(user|product). -
If your regular expression contains a comma, you must rewrite it by using the pipe (
|) operator. For example, rewrite the expressionmytable_\d{1,2}as its equivalent(mytable_\d{1}|mytable_\d{2})to avoid using a comma.
-
-
Manage parallelism
The MySQL connector supports parallel reads of the full dataset, which improves data loading efficiency. When combined with the Autopilot feature, the job can automatically scale down resources during the incremental phase after the parallel read is complete, saving computing resources.
In the Realtime Compute for Apache Flink console, you can set the job's parallelism on the Resource Configuration page in either Basic Mode or Expert Mode. The differences are as follows:
-
Basic Mode sets the parallelism for the entire job.

-
Expert Mode allows you to set the parallelism for a specific VERTEX on demand.

Resource configuration: Configure job deployment information.
ImportantRegardless of the mode you use, the range of
server-idvalues declared in the table must be greater than or equal to the job's parallelism. For example, if theserver-idrange is5404-5412, there are 8 unique server IDs, so the job can have a maximum parallelism of 8. In addition, theserver-idranges for different jobs connecting to the same MySQL instance must not overlap. Each job requires a uniqueserver-idrange. -
-
Autopilot automatic scaling
The full snapshot phase accumulates a large volume of historical data. To improve reading efficiency, data is typically read in parallel. In contrast, during the incremental Binlog phase, data volume is smaller and global ordering must be preserved, so a single reader is usually sufficient. The Autopilot feature automatically balances performance and resources to meet these different requirements.
Autopilot monitors the traffic of each MySQL CDC source task. When the job enters the Binlog phase, if only one task is actively reading from the Binlog while other tasks are idle, Autopilot automatically scales down the source's CU count and parallelism. To enable Autopilot, go to the job's O&M page and set the Autopilot mode to Active.
NoteThe minimum trigger interval for scaling down is 24 hours by default. Configure Autopilot.
-
Startup modes
Use the
scan.startup.modeoption to specify the startup mode for the MySQL CDC source. The available values are:-
initial (Default): Performs a full snapshot of the database table on the first startup, and then switches to incremental mode to read the Binlog.
-
earliest-offset: Skips the snapshot phase and starts reading from the earliest available Binlog offset.
-
latest-offset: Skips the snapshot phase and starts reading from the end of the Binlog. In this mode, the source can only read data changes that occur after the job has started.
-
specific-offset: Skips the snapshot phase and starts reading from a specified Binlog offset. You can specify the offset using a Binlog filename and position, or a GTID set.
-
timestamp: Skips the snapshot phase and starts reading Binlog events from a specified timestamp.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset. 'scan.startup.mode' = 'latest-offset', -- Start from the latest offset. 'scan.startup.mode' = 'specific-offset', -- Start from a specific offset. 'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the Binlog filename in specific-offset mode. 'scan.startup.specific-offset.pos' = '4', -- Specify the Binlog position in specific-offset mode. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set in specific-offset mode. 'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp in timestamp mode. ... )Important-
During each checkpoint, the MySQL source logs the current offset at the INFO level. The log prefix is
Binlog offset on checkpoint {checkpoint-id}. You can use this log information to start a job from a specific checkpoint offset. -
If the schema of a table has changed, starting from
earliest-offset,specific-offset, ortimestampmay cause errors. The underlying Debezium reader maintains the latest schema internally and cannot correctly parse earlier data that has a different schema.
-
-
About CDC source tables without a primary key
-
To use a table without a primary key, you must set the
scan.incremental.snapshot.chunk.key-columnoption to a non-nullable column with unique values. -
The processing semantics for a table without a primary key depend on whether the column specified by
scan.incremental.snapshot.chunk.key-columnis updated:-
If the specified column is never updated, exactly-once semantics are guaranteed.
-
If the specified column is updated, only at-least-once semantics can be guaranteed. However, you can ensure data correctness by defining a primary key in the sink and using idempotent operations.
-
-
-
Read ApsaraDB RDS for MySQL backup logs
The MySQL CDC source supports reading backup logs from ApsaraDB RDS for MySQL. This is useful when the full snapshot phase is lengthy, causing local Binlog files to be purged, while backup files (either automatically or manually uploaded) are still available.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... ) -
Enable CDC source reuse
Multiple MySQL CDC source tables in a single job each start a separate Binlog client, increasing database load on the same instance. MySQL CDC FAQ.
Solution
VVR versions 8.0.7 and later support MySQL CDC Source reuse. The reuse feature merges compatible MySQL CDC source tables. A merge occurs when the source tables have the same configuration options, except for the database name, table name, and
server-id. The engine automatically merges MySQL CDC sources within the same job.Procedure
-
Use the
SETcommand in an SQL job:SET 'table.optimizer.source-merge.enabled' = 'true'; # For VVR 8.0.8 and 8.0.9, also set this option: SET 'sql-gateway.exec-plan.enabled' = 'false';Source reuse is enabled by default in VVR 11.1 and later.
-
Start the job without state. Modifying the Source reuse configuration option changes the job topology. You must start the job without state. Otherwise, the job may fail to start or lose data. If sources are merged, you will see a
MergetableSourceScannode.
Important-
We recommend that you do not disable operator chaining after you enable reuse. Setting
pipeline.operator-chainingtofalseincreases the overhead of data serialization and deserialization. The more Sources that are merged, the higher the overhead. -
In VVR 8.0.7, disabling operator chaining causes serialization issues.
-
Accelerate binlog reading
When used as a source, the MySQL connector parses the binlog during the incremental phase to generate change events. The binlog records all table changes in a binary format. The following methods accelerate binlog parsing:
-
Enable parsing filters
-
Use the
scan.only.deserialize.captured.tables.changelog.enabledoption to parse change events only for specified tables.
-
-
Tune Debezium parameters
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50-
debezium.max.queue.size: Specifies the maximum number of records in the blocking queue. This queue holds events that Debezium reads from the database before writing them downstream. Default: 8192. -
debezium.max.batch.size: Specifies the maximum number of events the connector processes per iteration. Default: 2048. -
debezium.poll.interval.ms: Specifies the interval in milliseconds that the connector waits between requests for new change events. Default: 1000 (1 second).
-
Example:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium configuration
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Enable parsing filters
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Parse change events for specified tables only.
...
)
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium configuration
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Enable parsing filters
scan.only.deserialize.captured.tables.changelog.enabled: true
The fully managed MySQL connector can consume the binlog at a rate of up to 85 MB/s, which is approximately twice the throughput of the open-source Flink connector for MySQL. If the binlog generation rate exceeds 85 MB/s (equivalent to one 512 MB file every 6 seconds), the Flink job's latency will continuously increase. The latency will decrease once the generation rate drops. A large transaction in the binlog can cause a temporary spike in processing latency. This spike subsides after the transaction is fully read.
MySQL CDC DataStream API
To read and write data with the DataStream API, use the corresponding DataStream connector. For setup instructions, see DataStream connector usage.
The following example demonstrates how to use MySqlSource in a DataStream API program and lists the required POM dependencies.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // Set the database to capture
.tableList("yourDatabaseName.yourTableName") // Set the table to capture
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>To build the MySqlSource, specify the following parameters:
|
Parameter |
Description |
|
hostname |
The IP address or hostname of the MySQL database. |
|
port |
The port number of the MySQL database. |
|
databaseList |
The names of the databases to capture. Note
The database name supports regular expressions to read data from multiple databases. You can use |
|
username |
The username for the MySQL database. |
|
password |
The password for the MySQL database. |
|
deserializer |
A deserializer that converts
|
Specify the following placeholders in your POM dependencies:
|
${vvr.version} |
The engine version of Realtime Compute for Apache Flink, such as Note
Use the version number from Maven, as hotfixes may be released without a separate announcement. |
|
${flink.version} |
Apache Flink version, for example: Important
Use the Apache Flink version that corresponds to your Realtime Compute for Apache Flink engine version to avoid incompatibility. Version compatibility: Engines. |
FAQ
For issues with CDC source tables, see CDC issues.
Flink CDC principles and enterprise features
-
Flink CDC Enterprise Features
-
Flink CDC Technical Principles