MySQL

更新时间:
复制 MD 格式

The MySQL connector reads change data capture (CDC) streams and writes data to MySQL-compatible databases, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.

Overview

The MySQL connector supports all MySQL-protocol-compatible databases: ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.

Important

To read data from OceanBase, enable and correctly configure the OceanBase binlog (Binlog-related operations). This feature is in public preview. Evaluate its performance and stability before production use.

The following table summarizes the connector capabilities.

Category

Description

Supported types

source table, dimension table, sink table, and data ingestion source

Execution mode

Streaming only

Data format

Not applicable

Specific metrics

Metrics

  • Source table

    • currentFetchEventTimeLag: The latency between the event generation time and the time the record is fetched by the source operator.

      This metric is available only during the binlog phase; its value is 0 during the snapshot phase.

    • currentEmitEventTimeLag: The latency between the event generation time and the time the record is emitted by the source operator.

      This metric is available only during the binlog phase; its value is 0 during the snapshot phase.

    • sourceIdleTime: The time elapsed since the source operator last emitted a record.

  • dimension table and sink table: None.

API types

DataStream, SQL, and data ingestion YAML

Update/delete sink table data

Supported

Features

The MySQL CDC source reads a full database snapshot, then seamlessly switches to binlog for incremental updates with Exactly-Once semantics. It supports concurrent snapshot reads, lockless operation, and checkpoint-based recovery through an incremental snapshot algorithm. Understanding MySQL source.

  • Unified batch and streaming: Reads snapshot and incremental data in a single pipeline.

  • Horizontally scalable: Concurrent snapshot reads for horizontal scaling.

  • Automated resource optimization: Automatically scales down after switching from snapshot to incremental reading.

  • Enhanced stability: Checkpoint-based recovery during the snapshot phase enables resumption after interruptions.

  • Lock-free snapshots: Reads snapshot data without locks, avoiding impact on online workloads.

  • Cloud integration: Supports reading from the binlog of ApsaraDB RDS for MySQL instances.

  • Low-latency reads: Parallel binlog parsing minimizes read latency.

Prerequisites

Complete the steps in Configure a MySQL database before using a MySQL CDC source table.

RDS for MySQL

PolarDB for MySQL

  • Run a network probe to ensure network connectivity with Realtime Compute for Apache Flink.

  • MySQL version: 5.6, 5.7, or 8.0.x.

  • Enable the binlog (disabled by default).

  • Set the binlog format to ROW (default).

  • Set binlog_row_image to FULL (default).

  • Disable binary log transaction compression (introduced in MySQL 8.0.20 and later, and disabled by default).

  • Create a MySQL user and grant the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.

  • Create a MySQL database and tables (Create a database and account for PolarDB for MySQL). Use a high-privilege account to avoid permission failures.

  • Configure an IP whitelist (Configure an IP address whitelist for PolarDB for MySQL).

Self-managed MySQL

  • Run a network probe to ensure network connectivity with Realtime Compute for Apache Flink.

  • MySQL version: 5.6, 5.7, or 8.0.x.

  • Enable the binlog (disabled by default).

  • Set the binlog format to ROW (the default is STATEMENT).

  • Set binlog_row_image to FULL (default).

  • Disable binary log transaction compression (introduced in MySQL 8.0.20 and later, and disabled by default).

  • Create a MySQL user and grant the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.

  • Create a MySQL database and tables (Create a database and account for a self-managed MySQL database). Use a high-privilege account to avoid permission failures.

  • Configure an IP whitelist (Add a security group rule).

Limitations

General

  • MySQL CDC source tables do not support defining watermarks. To perform windowed aggregation, use non-windowed aggregation instead. For more information, see How can I perform windowed aggregation if watermarks are not supported?.

  • In CTAS and CDAS jobs, MySQL CDC source tables support only partial schema evolution. Supported change types: Schema evolution synchronization policy.

  • The MySQL CDC connector does not currently support binary log transaction compression. When using the MySQL CDC connector to consume incremental data, disable this feature. Otherwise, the connector may fail to retrieve incremental data.

ApsaraDB RDS for MySQL

  • For ApsaraDB RDS for MySQL, avoid reading data from a standby or read-only instance. These instances typically have short retention periods for the binary log. If the logs expire before being processed, the job may fail.

  • ApsaraDB RDS for MySQL enables parallel replication by default, which does not guarantee a consistent transaction order between the primary and secondary instances. This can cause data loss when the job recovers from a checkpoint after a primary-secondary switchover. To avoid this issue, enable the slave_preserve_commit_order option.

PolarDB for MySQL

MySQL CDC source tables do not support reading data from a multi-master cluster of PolarDB for MySQL version 1.0.19 or earlier (What is a multi-master cluster?). The binary logs from these cluster versions may contain duplicate table IDs, which can cause schema mapping errors, resulting in data parsing failures.

Open-source MySQL

By default, MySQL preserves transaction order during binary log replication. However, if a MySQL replica has parallel replication enabled (slave_parallel_workers > 1) but slave_preserve_commit_order is not set to ON, its transaction commit order can differ from that of the primary instance. This discrepancy can lead to data loss when a Flink CDC job resumes from a checkpoint. Set slave_preserve_commit_order = ON on the MySQL replica. Alternatively, set slave_parallel_workers = 1, although this may reduce replication performance.

Usage notes

  • Source table

    • Each MySQL CDC data source must be explicitly configured with a unique server ID.

      Purpose of a server ID

      If multiple data sources share the same server ID and cannot be reused, it can lead to inconsistent binary log offsets, resulting in data duplication or loss.

      Server ID configuration for different scenarios

      You can specify the server ID in the DDL, but we recommend using dynamic hints instead.

      • Parallelism = 1 or incremental snapshot disabled

        ## When the incremental snapshot feature is disabled or parallelism is 1, you can specify a single server ID.
        SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
      • Parallelism > 1 and incremental snapshot enabled

        ## You must specify a server ID range. Ensure the number of available server IDs in the range is greater than or equal to the parallelism. For example, if the parallelism is 3:
        SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
      • Data synchronization with CTAS

        CTAS data synchronization automatically reuses CDC data sources with the same configuration, allowing the same server ID for multiple sources. Example 4: Multiple CTAS statements.

      • Multiple non-CTAS source tables

        If a job contains multiple MySQL CDC source tables and does not use CTAS for synchronization, the data sources cannot be reused. You must assign a unique server ID to each CDC source table. Similarly, if the incremental snapshot framework is enabled and the parallelism is greater than 1, you must specify a server ID range.

        select * from 
          source_table1 /*+ OPTIONS('server-id'='123456-123457') */
        left join 
          source_table2 /*+ OPTIONS('server-id'='123458-123459') */
        on source_table1.id=source_table2.id;
    • During the full load phase, do not add or remove source tables after creating a savepoint. Restarting the job from that savepoint will then cause data reading errors.

  • Sink table

    • Do not declare an auto-increment primary key in the DDL. MySQL automatically populates this field when writing data.

    • You must declare at least one non-primary key field. Otherwise, an error occurs.

    • The NOT ENFORCED constraint in the DDL indicates that Flink does not validate the primary key. You must ensure primary key correctness and integrity. Validity Check.

  • Dimension table

    To use an index to accelerate queries, ensure that the order of the fields in the JOIN condition matches the order of the columns in the index definition. This adheres to the leftmost prefix rule. For example, if an index is defined on (a, b, c), the JOIN condition should be ON t.a = x AND t.b = y.

    The Flink optimizer may rewrite the generated SQL, preventing the query from using the intended index. To confirm that the index is used, check the execution plan (EXPLAIN) or the slow query log in MySQL to inspect the actual SELECT statement.

SQL

Use the MySQL connector in SQL jobs as a source, dimension, or sink table.

Syntax

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);
Note
  • The connector writes to a target table by converting each incoming record into an SQL statement. The specific SQL statement executed depends on the table schema:

    • For a target table without a primary key, the connector executes anINSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); statement.

    • For a target table with a primary key, the connector performs an upsert by executing anINSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; statement. Please note: If the physical table has a unique index other than the primary key, inserting records with different primary keys but identical unique index values can cause a conflict, leading to data being overwritten.

  • If an auto-increment primary key is defined in the MySQL database, do not declare the auto-increment column in your Flink DDL. The database automatically populates this column during the write operation. The connector supports INSERT and DELETE operations for tables with an auto-increment column, but does not support UPDATE operations.

WITH parameters

  • General

    Parameter

    Description

    Required

    Type

    Default

    Notes

    connector

    The connector type.

    Yes

    STRING

    -

    For a source table, the value can be mysql-cdc or mysql. These two values are equivalent. For a dimension table or sink table, the value must be mysql.

    hostname

    The IP address or hostname of the MySQL database.

    Yes

    STRING

    -

    A VPC address is recommended.

    Note

    If your MySQL database and Realtime Compute for Apache Flink workspace are in different VPCs, establish a cross-VPC network connection or connect over the public network. Space Management and Operations and How can a fully managed Flink cluster access the public network?.

    username

    The username for the MySQL database.

    Yes

    STRING

    -

    -

    password

    The password for the MySQL database.

    Yes

    STRING

    -

    -

    database-name

    The database name.

    Yes

    STRING

    -

    • When used as a source, you can specify a regular expression to read data from multiple databases.

    • When using regular expressions, avoid the ^ and $ symbols. See the notes for table-name.

    table-name

    The table name.

    Yes

    STRING

    -

    • When used as a source, you can specify a regular expression to read data from multiple tables.

      To improve performance when reading from multiple MySQL tables, submit multiple CTAS statements as a single job to avoid multiple binlog listeners. Run multiple CTAS statements in a single job.

    • When you use regular expressions, avoid using the ^ and $ symbols to match the start and end. See the following description for the specific reasons.

    Note

    When a MySQL CDC source table matches table names by using a regular expression, it concatenates the database-name and table-name that you enter with the string \\. (for VVR versions before 8.0.1, the character . is used) to form a full-path regular expression, and then uses this expression to match the fully qualified names of tables in the MySQL database.

    For example, when you configure 'database-name'='db_.*' and 'table-name'='tb_.+', the connector will use the regular expression db_.*\\.tb_.+ (or db_.*.tb_.+ for versions earlier than 8.0.1) to match fully qualified table names to determine which tables to read.

    port

    The port of the MySQL database.

    No

    INTEGER

    3306

    -

  • Source-specific

    Parameter

    Description

    Required

    Type

    Default

    Remarks

    server-id

    A numeric ID for the database client.

    No

    STRING

    Random integer between 5400 and 6400.

    The server ID must be globally unique within the MySQL cluster. Assign a different ID for each job connecting to the same database.

    Also supports an ID range (e.g. 5400-5408). When incremental snapshot is enabled with concurrent readers, specify an ID range with at least as many IDs as the source parallelism. Server ID Usage.

    scan.incremental.snapshot.enabled

    Enables or disables incremental snapshot.

    No

    BOOLEAN

    true

    Incremental snapshot is enabled by default. Compared to the legacy snapshot mechanism, it offers:

    • Parallel snapshot reads.

    • Chunk-level checkpointing during the snapshot phase.

    • No global read lock (FLUSH TABLES WITH READ LOCK) required.

    To enable parallel reading, each concurrent reader requires a unique server ID. Therefore, the server-id must be set to a range, such as 5400-6400, and the size of the range must be greater than or equal to the source parallelism.

    Note

    This parameter is removed in Flink engine VVR 11.1 and later.

    scan.incremental.snapshot.chunk.size

    The number of rows in each chunk.

    No

    INTEGER

    8096

    With incremental snapshot enabled, the table is split into multiple chunks. Each chunk is buffered in memory until fully read.

    Smaller chunks enable more granular fault recovery but can cause OOM issues and lower throughput.

    scan.snapshot.fetch.size

    The maximum number of rows to fetch per batch during a snapshot read.

    No

    INTEGER

    1024

    -

    scan.startup.mode

    The startup mode for the connector.

    No

    STRING

    initial

    Valid values include:

    • initial (default): On the first startup, the connector performs a snapshot of the existing data and then continues to read the latest Binlog data.

    • latest-offset: On the first startup, the connector skips the snapshot and starts reading from the end of the Binlog. It reads only changes that occur after the connector starts.

    • earliest-offset: The connector skips the snapshot and starts reading from the earliest available Binlog offset.

    • specific-offset: The connector skips the snapshot and starts from a user-specified Binlog offset. You can specify the offset by setting both scan.startup.specific-offset.file and scan.startup.specific-offset.pos, or by setting only scan.startup.specific-offset.gtid-set to start from a specific GTID set.

    • timestamp: The connector skips the snapshot and starts reading the Binlog from a specified timestamp. The timestamp is specified in milliseconds by the scan.startup.timestamp-millis parameter.

    Important

    When you use the earliest-offset, specific-offset, or timestamp startup mode, ensure that the table schema does not change between the specified starting offset and the time the job is started. Schema changes can cause errors.

    scan.startup.specific-offset.file

    The Binlog filename for the starting offset.

    No

    STRING

    -

    When you use this configuration, scan.startup.mode must be set to specific-offset. The file name is in a format such as mysql-bin.000003.

    scan.startup.specific-offset.pos

    The starting offset within the specified Binlog file.

    No

    INTEGER

    -

    When you use this configuration, scan.startup.mode must be set to specific-offset.

    scan.startup.specific-offset.gtid-set

    The GTID set for the starting offset.

    No

    STRING

    -

    When you use this configuration, scan.startup.mode must be set to specific-offset. The GTID set is in a format such as 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    The starting timestamp in milliseconds.

    No

    LONG

    -

    When you use this configuration, scan.startup.mode must be set to timestamp. The unit of the timestamp is milliseconds.

    Important

    When starting from a specified timestamp, the MySQL CDC connector scans the initial event of each Binlog file to locate the file corresponding to the timestamp. Ensure that the Binlog file for the given timestamp has not been purged and is accessible on the database server.

    server-time-zone

    The session time zone of the database server.

    No

    STRING

    If this parameter is not specified, the system uses the time zone of the Flink job's runtime environment, which is the time zone of the selected availability zone.

    Example: Asia/Shanghai. Controls how TIMESTAMP values are converted to strings. Debezium temporal values.

    debezium.min.row.count.to.stream.results

    When the number of rows in a table exceeds this value, the connector uses batch read instead of snapshot read.

    No

    INTEGER

    1000

    Flink reads MySQL source tables in one of the following ways:

    • Snapshot read: Loads the entire table into memory. This method is fast but memory-intensive and can cause out of memory (OOM) errors for large tables.

    • Batch read: Reads the table in smaller batches. This method is memory-efficient but can be slower than a snapshot read.

    connect.timeout

    The timeout for a connection attempt to the MySQL server. If the attempt times out, the connector retries.

    No

    DURATION

    30s

    -

    connect.max-retries

    The maximum number of times to retry a failed connection to the MySQL server.

    No

    INTEGER

    3

    -

    connection.pool.size

    The size of the database connection pool.

    No

    INTEGER

    20

    The connection pool reuses database connections to reduce the overall connection count.

    jdbc.properties.*

    Custom parameters appended to the JDBC URL.

    No

    STRING

    -

    You can pass custom connection parameters. For example, to disable SSL, set 'jdbc.properties.useSSL' = 'false'.

    For a list of supported parameters, see MySQL Configuration Properties.

    debezium.*

    Custom Debezium parameters for reading the Binlog.

    No

    STRING

    -

    You can pass custom Debezium parameters. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling mode for deserialization failures.

    Warning

    Do not modify Debezium parameters arbitrarily, as this can cause data reading errors. For example, the debezium.binlog.buffer.size parameter must not be configured.

    heartbeat.interval

    The interval for emitting heartbeat events to advance the Binlog offset.

    No

    DURATION

    30s

    Heartbeat events advance the Binlog offset. This is especially useful for slowly updated tables where the Binlog offset might not advance automatically. This prevents the Binlog offset from becoming outdated, which could cause the job to fail and require a stateless restart.

    scan.incremental.snapshot.chunk.key-column

    Specifies a column to use for splitting chunks during the snapshot phase.

    See the Remarks column.

    STRING

    -

    • Required for tables without a primary key. The selected column must be of a non-null type (NOT NULL).

    • Optional for tables with a primary key. You can select only one column from the primary key.

    rds.region-id

    The region ID of the ApsaraDB RDS for MySQL instance.

    Required when reading archived logs from OSS.

    STRING

    -

    For a list of region IDs, see Regions and zones.

    Important

    MySQL CDC generates random GTID strings, which are not monotonically increasing like Binlog file offsets. To locate a specific file by its GTID, the connector must download and parse all archived logs from OSS, a process that is resource-intensive and time-consuming. Therefore, reading from archived OSS logs is not feasible with GTID-based startup modes. This feature only supports starting from a specific timestamp or a Binlog file offset. It also does not support scenarios involving primary-secondary switchovers within the archived logs, as these rely on GTIDs. Evaluate these limitations carefully before using this feature.

    rds.access-key-id

    The AccessKey ID for the ApsaraDB RDS for MySQL account.

    Required when reading archived logs from OSS.

    STRING

    -

    How do I view my AccessKey ID and AccessKey secret?.

    Important

    Security Note: To avoid exposing your credentials, use Variable Management to store and retrieve your AccessKey ID.

    rds.access-key-secret

    The AccessKey secret for the ApsaraDB RDS for MySQL account.

    Required when reading archived logs from OSS.

    STRING

    -

    How do I view my AccessKey ID and AccessKey secret?

    Important

    Security Note: To avoid exposing your credentials, use Variable Management to store and retrieve your AccessKey secret.

    rds.db-instance-id

    The instance ID of the ApsaraDB RDS for MySQL instance.

    Required when reading archived logs from OSS.

    STRING

    -

    -

    rds.main-db-id

    The primary database ID of the ApsaraDB RDS for MySQL instance.

    No

    STRING

    -

    Note

    If this parameter is not specified, Flink engine VVR 11.7 and later automatically queries the primary database ID based on the ApsaraDB RDS for MySQL connection information.

    rds.download.timeout

    The timeout for downloading a single archived log file from OSS.

    No

    DURATION

    60s

    -

    rds.endpoint

    The service endpoint for retrieving Binlog information from OSS.

    No

    STRING

    -

    • For a list of available values, see Service endpoints.

    • Supported in Flink engine VVR 8.0.8 and later.

    scan.incremental.close-idle-reader.enabled

    Specifies whether to close an idle reader after the snapshot phase is complete.

    No

    BOOLEAN

    false

    • Supported in Flink engine VVR 8.0.1 and later.

    • For this setting to take effect, you must also set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

    scan.read-changelog-as-append-only.enabled

    Specifies whether to convert the changelog stream to an append-only stream.

    No

    BOOLEAN

    false

    Valid values:

    • true: All message types (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) are converted to INSERT messages. Enable this only for specific use cases, such as preserving deleted messages from an upstream table.

    • false (default): All message types are emitted as-is.

    Note

    Supported in Flink engine VVR 8.0.8 and later.

    scan.only.deserialize.captured.tables.changelog.enabled

    During the incremental phase, specifies whether to deserialize change events only for the captured tables.

    No

    BOOLEAN

    • false for Flink engine VVR 8.x versions.

    • true for Flink engine VVR 11.1 and later.

    Valid values:

    • true: Deserializes change data only for the captured tables, which improves Binlog reading speed.

    • false: Deserializes change data for all tables.

    Note
    • Supported in Flink engine VVR 8.0.7 and later.

    • In Flink engine VVR 8.0.8 and earlier versions, this parameter is named debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    During the incremental phase, specifies whether to parse DDL events from RDS lockless change.

    No

    BOOLEAN

    false

    Valid values:

    • true: Parses DDL events from RDS lockless change.

    • false (default): Does not parse DDL events from RDS lockless change.

    This is an experimental feature. Take a savepoint of your Flink job before you perform online lockless changes to ensure you can recover if needed.

    Note

    Supported in Flink engine VVR 11.1 and later.

    scan.incremental.snapshot.backfill.skip

    Specifies whether to skip the backfill process during the snapshot phase.

    No

    BOOLEAN

    false

    Valid values:

    • true: Skips the backfill process.

    • false (default): Does not skip the backfill process.

    If backfill is skipped, changes to the table that occur during the snapshot phase are read later in the incremental phase, instead of being merged into the snapshot.

    Important

    Skipping backfill can lead to data inconsistencies because changes that occur during the snapshot phase might be replayed. This provides only at-least-once semantics.

    Note

    Supported in Flink engine VVR 11.1 and later.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Specifies whether to distribute the unbounded chunk first during the snapshot phase.

    No

    BOOLEAN

    false

    Valid values:

    • true: The unbounded chunk is prioritized for distribution during the snapshot phase.

    • false (default): The unbounded chunk is not prioritized.

    This is an experimental feature. Enabling it can reduce the risk of out of memory (OOM) errors on the TaskManager when it synchronizes the final chunk. Enable this feature before the job starts for the first time.

    Note

    Supported in Flink engine VVR 11.1 and later.

    binlog.session.network.timeout

    The network read and write timeout for the Binlog connection.

    No

    DURATION

    10m

    If set to 0s, the connection uses the default timeout of the MySQL server.

    Note

    Supported in Flink engine VVR 11.5 and later.

    scan.rate-limit.records-per-second

    Limits the maximum number of records that the source can emit per second.

    No

    LONG

    -

    Use this parameter to limit the data read rate. This limit applies to both the snapshot and incremental phases.

    The numRecordsOutPerSecond metric of the Source reflects the number of records output per second for the entire data stream. You can adjust this parameter based on this metric.

    During the snapshot read phase, you typically need to reduce the number of data rows read in each batch by reducing the value of the scan.incremental.snapshot.chunk.size parameter.

    Note

    Supported in Flink engine VVR 11.5 and later.

    scan.binlog.tolerate.gtid-holes

    Enables the job to ignore gaps in the GTID sequence, allowing it to bypass discontinuous events and continue running.

    No

    BOOLEAN

    false

    Before enabling this parameter, ensure that the startup offset of the job has not expired. If the job starts from a purged or expired GTID offset, the engine silently skips the missing logs, causing data loss.

    Note

    Supported in Flink engine VVR 11.6 and later.

  • Dimension table parameters

    Parameter

    Description

    Required

    Type

    Default

    Notes

    url

    The JDBC URL for the MySQL database.

    No

    STRING

    None

    The URL format is: jdbc:mysql://<endpoint>:<port>/<database name>.

    lookup.max-retries

    The maximum number of retries if a lookup request fails.

    No

    INTEGER

    3

    Supported only by Flink compute engine VVR 6.0.7 and later.

    lookup.cache.strategy

    The caching policy.

    No

    STRING

    None

    Supported values: None, LRU, and ALL. Background information.

    Note

    The LRU caching policy requires the lookup.cache.max-rows parameter.

    lookup.cache.max-rows

    The maximum number of rows to store in the cache.

    No

    INTEGER

    100000

    • Required if the caching policy is set to LRU.

    • Optional if the caching policy is set to ALL.

    lookup.cache.ttl

    The time-to-live (TTL) for cached entries.

    No

    DURATION

    None

    The behavior of the lookup.cache.ttl parameter depends on the value of lookup.cache.strategy:

    • If lookup.cache.strategy is set to None, lookup.cache.ttl does not need to be configured, which means the cache does not expire.

    • If lookup.cache.strategy is set to LRU, lookup.cache.ttl specifies the cache timeout duration. By default, the cache does not expire.

    • If lookup.cache.strategy is set to ALL, lookup.cache.ttl specifies the cache reload interval. By default, the cache is not reloaded.

    Specify the duration in a time format, such as 1min or 10s.

    lookup.max-join-rows

    The maximum number of matching rows to return from the dimension table for each row in the source table.

    No

    INTEGER

    1024

    None

    lookup.filter-push-down.enabled

    Specifies whether to enable filter pushdown for the dimension table.

    No

    BOOLEAN

    false

    Valid values:

    • true: Enables filter pushdown. The system filters data from the dimension table based on the conditions in the SQL job before loading it.

    • false (Default): Disables filter pushdown. The system loads the full dataset from the dimension table.

    Note

    Supported only by Flink compute engine VVR 8.0.7 and later.

    Important

    Use filter pushdown only for Flink dimension tables. This feature is not supported for MySQL source tables. If you use the same Flink table as both a source and a dimension table with filter pushdown enabled, you must use SQL Hints to set this parameter to false when using it as a source table. Otherwise, the job may fail.

  • Specific to sink tables

    Parameter

    Description

    Required

    Type

    Default

    Notes

    url

    The JDBC URL for the MySQL database.

    No

    STRING

    None

    The format of the URL is: jdbc:mysql://<endpoint>:<port>/<database name>.

    sink.max-retries

    The maximum number of retries for a failed write operation.

    No

    INTEGER

    3

    None

    sink.buffer-flush.batch-size

    The number of records to include in each batch write.

    No

    INTEGER

    4096

    None

    sink.buffer-flush.max-rows

    The maximum number of records to buffer in memory before flushing.

    No

    INTEGER

    10000

    This parameter takes effect only when a primary key is specified.

    sink.buffer-flush.interval

    The flush interval. The system automatically flushes the buffer when this interval is reached, even if other flush conditions are not met.

    No

    DURATION

    1s

    None

    sink.ignore-delete

    Whether to ignore DELETE operations.

    No

    BOOLEAN

    false

    When a Flink SQL job generates a stream that includes retractions (DELETE or UPDATE_BEFORE messages), concurrent updates to the same table by multiple sink tasks can cause data inconsistencies.

    For example, if one task deletes a record while another task updates only a few of its fields, the non-updated fields might be incorrectly set to null or their default values.

    To prevent this issue, set sink.ignore-delete to true to ignore upstream DELETE and UPDATE_BEFORE operations.

    Note
    • UPDATE_BEFORE is part of Flink's retraction mechanism, which retracts the old value during an update operation.

    • When sink.ignore-delete is true, the sink skips all DELETE and UPDATE_BEFORE records, processing only INSERT and UPDATE_AFTER records.

    sink.ignore-null-when-update

    How to handle null values in update records.

    No

    BOOLEAN

    false

    Valid values:

    • true: Ignores null values, leaving the corresponding field in the destination table unchanged. This setting requires a primary key to be defined for the Flink table. When set to true:

      • For VVR 8.0.6 and earlier, batch write is not supported for the sink table.

      • For VVR 8.0.7 and later, batch write is supported for the sink table.

        While a batch write can significantly improve throughput, it may introduce data latency and increase the risk of out-of-memory errors. Evaluate these trade-offs based on your business requirements.

    • false: Updates the corresponding field to null.

    Note

    This parameter is supported in VVR versions 8.0.5 and later.

Type mapping

  • CDC source table

    MySQL CDC type

    Flink type

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Important

    Avoid using the MySQL TINYINT(1) type to store values other than 0 and 1. By default, when property-version is 0, the MySQL CDC source table maps TINYINT(1) to the Flink BOOLEAN type, which can lead to data inaccuracies. To use the TINYINT(1) type to store other numeric values, see the catalog.table.treat-tinyint1-as-boolean configuration parameter.

  • Dimension and result tables

    MySQL type

    Flink type

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    p must be 38 or less.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Flink supports MySQL BLOB records up to 2,147,483,647 bytes (2^31 - 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

Data ingestion

You can use the MySQL connector as a data source for Flink CDC.

Syntax

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Parameters

Parameter

Description

Required

Type

Default

Notes

type

Data source type.

Yes

STRING

None

The value must be mysql.

name

Name of the data source.

No

STRING

None

-

hostname

The hostname or IP address of the MySQL database.

Yes

STRING

None

We recommend using a Virtual Private Cloud (VPC) address.

Note

If your MySQL database and Realtime Compute for Apache Flink workspace are in different VPCs, you must connect them via a cross-VPC network or use public network access. For more information, see Workspace management and operations and How do I access the public network from a fully managed Flink cluster?.

username

The username for the MySQL database.

Yes

STRING

None

-

password

The password for the MySQL database.

Yes

STRING

None

-

tables

The MySQL tables to be synchronized.

Yes

STRING

None

  • Use a regular expression to read data from multiple tables.

  • Separate multiple regular expressions with a comma.

Note
  • Do not use the start ^ and end $ anchors in regular expressions. In version 11.2, the regular expression for a database is obtained by splitting a string by a dot. These anchors will make the resulting database regular expression unusable. For example, change ^db.user_[0-9]+$ to db.user_[0-9]+.

  • The dot character separates the database and table names. To match a literal dot, you must escape it with a backslash. Examples: db0\\..*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\\.*.

tables.exclude

The tables to exclude from synchronization.

No

STRING

None

  • Use a regular expression to exclude multiple tables.

  • Separate multiple regular expressions with a comma.

Note

The dot character separates the database and table names. To match a literal dot, you must escape it with a backslash. Examples: db0\\..*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\\.*.

port

The port of the MySQL database.

No

INTEGER

3306

-

schema-change.enabled

Whether to send schema change events.

No

BOOLEAN

true

-

server-id

A unique numeric ID or range for the client connection to the MySQL server.

No

STRING

A random integer between 5400 and 6400.

The ID must be unique across all clients in the MySQL cluster. We recommend assigning a different ID to each job that connects to the same database.

This parameter also supports an ID range, such as 5400-5408. Specifying a range is recommended when incremental read with multiple readers is enabled, allowing each concurrent reader to use a different ID.

jdbc.properties.*

Custom connection parameters for the JDBC URL.

No

STRING

None

You can pass custom connection parameters. For example, you can set jdbc.properties.useSSL = false to not use the SSL protocol.

For a list of supported parameters, see MySQL Configuration Properties.

debezium.*

Custom parameters for the Debezium connector that reads the binlog.

No

STRING

None

For example, set 'debezium.event.deserialization.failure.handling.mode' = 'ignore' to specify the error handling logic for deserialization failures.

Warning

Do not modify Debezium parameters arbitrarily to avoid data read errors. For example, configuring the debezium.binlog.buffer.size parameter is prohibited.

scan.incremental.snapshot.chunk.size

The number of rows per chunk.

No

INTEGER

8096

Tables are split into chunks for reading. Each chunk is buffered in memory until it is fully read.

A smaller chunk size increases the total number of chunks. Although this provides finer-grained fault recovery, it may lead to out-of-memory (OOM) errors and reduce overall throughput. You must balance these trade-offs and set a reasonable chunk size.

scan.snapshot.fetch.size

The maximum number of records to fetch per batch when reading full data from a table.

No

INTEGER

1024

-

scan.startup.mode

The data consumption startup mode.

No

STRING

initial

Valid values:

  • initial (default): On its first startup, the connector performs an initial snapshot of the full data, then reads the latest binlog data.

  • latest-offset: On first startup, skips the snapshot of historical data and starts reading from the end of the binlog. It captures only changes that occur after the connector starts.

  • earliest-offset: Skips the snapshot and starts reading from the earliest available binlog offset.

  • specific-offset: Does not scan the full historical data and starts from a specific Binlog offset. The offset can be specified by configuring both the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters to start from a specific Binlog filename and offset, or by configuring only the scan.startup.specific-offset.gtid-set parameter to start from a specific GTID set.

  • timestamp: Skips scanning historical full data and starts reading the Binlog from a specified timestamp. The timestamp is specified in milliseconds by using scan.startup.timestamp-millis.

Important

For the earliest-offset, specific-offset, and timestamp startup modes, if the table schema at the time the job starts is different from the schema at the specified startup offset, the job reports an error. In other words, when you use these three startup modes, you must ensure that the schema of the corresponding table does not change between the specified Binlog consumption offset and the time the job starts.

scan.startup.specific-offset.file

The binlog filename from which to start when scan.startup.mode is set to specific-offset.

No

STRING

None

When you use this configuration, scan.startup.mode must be set to specific-offset. The file name format is, for example, mysql-bin.000003.

scan.startup.specific-offset.pos

The binlog file offset from which to start when scan.startup.mode is set to specific-offset.

No

INTEGER

None

When you use this configuration, scan.startup.mode must be set to specific-offset.

scan.startup.specific-offset.gtid-set

The GTID set from which to start when scan.startup.mode is set to specific-offset.

No

STRING

None

For this configuration, scan.startup.mode must be set to specific-offset. A GTID set is in a format such as 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

The timestamp in milliseconds from which to start when scan.startup.mode is set to timestamp.

No

LONG

None

When you use this configuration, scan.startup.mode must be set to timestamp. The timestamp is specified in milliseconds.

Important

When using a specified timestamp, the MySQL CDC connector locates the starting binlog file by checking the timestamp of the initial event in each file. Ensure that the binlog file corresponding to the specified timestamp has not been purged by the database and is accessible.

server-time-zone

The session time zone of the database.

No

STRING

Defaults to the time zone of the Flink job's runtime environment, which is typically the time zone of the selected deployment zone.

Example: Asia/Shanghai. Controls how MySQL TIMESTAMP types are converted to strings. Debezium temporal types.

scan.startup.specific-offset.skip-events

The number of binlog events to skip when starting from a specific offset.

No

INTEGER

None

This parameter is valid only when scan.startup.mode is set to specific-offset.

scan.startup.specific-offset.skip-rows

The number of row changes to skip when starting from a specific offset. A single binlog event can contain multiple row changes.

No

INTEGER

None

This parameter is valid only when scan.startup.mode is set to specific-offset.

connect.timeout

The maximum time to wait for a connection to the MySQL server before timing out.

No

DURATION

30s

-

connect.max-retries

The maximum number of retries after a failed connection to the MySQL server.

No

INTEGER

3

-

connection.pool.size

The size of the database connection pool.

No

INTEGER

20

A connection pool is used to reuse connections, which can reduce the number of connections to the database.

heartbeat.interval

The interval for emitting heartbeat events from the source to advance the binlog offset.

No

DURATION

30s

For slowly updated tables where the binlog offset may not advance automatically, heartbeat events ensure the offset progresses. This prevents the offset from expiring. An expired binlog offset can cause the job to fail and require a stateless restart.

rds.region-id

The region ID of the Alibaba Cloud RDS for MySQL instance.

Required when reading archived logs from OSS.

STRING

None

For a list of region IDs, see Regions and zones.

Important

MySQL CDC GTID strings are randomly generated and are not monotonically increasing like binlog file offsets. Locating a specific GTID in archived logs requires downloading and parsing all log files from OSS, which is highly inefficient and costly. Therefore, reading archived logs from OSS only supports starting from a specific timestamp or binlog offset. This feature does not support starting from a GTID set or scenarios with primary-secondary switchovers in the log history, as switchovers rely on GTIDs. Evaluate these limitations carefully before using this feature.

rds.access-key-id

The AccessKey ID for the Alibaba Cloud RDS for MySQL account.

Required when reading archived logs from OSS.

STRING

None

View AccessKey ID and AccessKey Secret.

Important

To avoid exposing your credentials, use Manage variables to provide the AccessKey ID.

rds.access-key-secret

The AccessKey Secret for the Alibaba Cloud RDS for MySQL account.

Required when reading archived logs from OSS.

STRING

None

View AccessKey ID and AccessKey Secret.

Important

To avoid exposing your credentials, use Manage variables to provide the AccessKey Secret.

rds.db-instance-id

The instance ID of the Alibaba Cloud RDS for MySQL instance.

Required when reading archived logs from OSS.

STRING

None

-

rds.main-db-id

The primary database ID of the Alibaba Cloud RDS for MySQL instance.

No

STRING

None

To obtain the primary database ID, see RDS for MySQL log backup.

Note

If this parameter is not specified, Flink engine VVR 11.7 and later automatically queries the primary database ID based on the ApsaraDB RDS for MySQL connection information.

rds.download.timeout

The timeout for downloading a single archived log file from OSS.

No

DURATION

60s

-

rds.endpoint

The service endpoint for retrieving binlog information from OSS.

No

STRING

None

For a list of available endpoints, see Endpoints.

rds.binlog-directory-prefix

The directory prefix for storing binlog files.

No

STRING

rds-binlog-

-

rds.use-intranet-link

Specifies whether to use an internal network link to download binlog files.

No

BOOLEAN

true

-

rds.binlog-directories-parent-path

The absolute path to the parent directory where binlog files are stored.

No

STRING

None

-

chunk-meta.group.size

The group size of chunk metadata.

No

INTEGER

1000

If the metadata size exceeds this value, it is transmitted in multiple parts.

chunk-key.even-distribution.factor.lower-bound

The lower bound of the distribution factor for even chunk splitting.

No

DOUBLE

0.05

If the distribution factor is less than this value, uneven chunking is used.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows.

chunk-key.even-distribution.factor.upper-bound

The upper bound of the distribution factor for even chunk splitting.

No

DOUBLE

1000.0

If the distribution factor is greater than this value, uneven chunking is used.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle readers after the snapshot phase is complete.

No

BOOLEAN

false

For this configuration to take effect, set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

scan.only.deserialize.captured.tables.changelog.enabled

During the incremental phase, specifies whether to deserialize change events only for the specified tables.

No

BOOLEAN

  • false in VVR 8.x versions.

  • true in VVR 11.1 and later versions.

Valid values:

  • true: Deserializes change data only for the target tables, which improves binlog reading speed.

  • false (default): Deserializes change data for all tables.

scan.parallel-deserialize-changelog.enabled

During the incremental phase, specifies whether to use multiple threads to parse change events.

No

BOOLEAN

false

Valid values:

  • true: Uses multiple threads for deserialization while preserving binlog event order, which improves reading speed.

  • false (default): Uses a single thread for deserialization.

Note

Supported in Ververica Runtime (VVR) 8.0.11 and later versions.

scan.parallel-deserialize-changelog.handler.size

The number of event handlers to use when multi-threaded deserialization is enabled.

No

INTEGER

2

Note

Supported in Ververica Runtime (VVR) 8.0.11 and later versions.

metadata-column.include-list

A comma-separated list of metadata columns to pass downstream.

No

STRING

None

The available metadata includes op_ts, es_ts, query_log, file, and pos. You can use English commas to separate multiple metadata columns.

Note

The MySQL CDC YAML connector does not require or support adding metadata columns for the database name, table name, or op_type. You can directly use __data_event_type__ in a Transform expression to retrieve the change data type, or use __schema_name__ and __table_name__ to retrieve the database name and table name.

Important
  • The file metadata column represents the binlog file that contains the data, and the pos metadata column represents the data's offset within that file. During the full load phase, file is an empty string ("") and pos is "0". During the incremental phase, file is the binlog filename and pos is the data's offset in the binlog file. These two metadata columns are supported starting from Ververica Runtime (VVR) version 11.5.

  • The es_ts metadata column represents the start time of the transaction in MySQL that corresponds to the changelog. This column is supported only for MySQL 8.0.x. Do not add this column when you use an earlier version of MySQL.

  • The op_ts timestamp is precise to the second, and the es_ts timestamp is precise to the millisecond.

scan.newly-added-table.enabled

On restart from a checkpoint, specifies whether to synchronize newly added tables or remove tables that no longer match the specified pattern.

No

BOOLEAN

false

This parameter takes effect when restarting from a checkpoint or savepoint.

Important

During the snapshot phase, you cannot add new tables to or remove tables from the source, take a savepoint, and then restart from that savepoint. This will prevent the job from reading data correctly.

scan.binlog.newly-added-table.enabled

Specifies whether to stream data from newly added tables discovered during the incremental (binlog) phase.

No

BOOLEAN

false

Cannot be enabled at the same time as scan.newly-added-table.enabled.

scan.incremental.snapshot.chunk.key-column

For specific tables, specifies a column to use as the key for splitting chunks during the snapshot phase.

No

STRING

None

  • Connect a table name and a column name with an English colon: to define a rule. The table name can be a regular expression. You can define multiple rules by separating them with an English semicolon;. For example: db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2.

  • Required for tables without a primary key. The specified column must be non-nullable (NOT NULL). For tables with a primary key, this parameter is optional, and you can only specify a single column from the primary key.

scan.parse.online.schema.changes.enabled

Specifies whether to parse DDL events from RDS lock-free schema changes during the incremental phase.

No

BOOLEAN

false

Valid values:

  • true: Parses DDL events from RDS lock-free schema changes.

  • false (default): Does not parse DDL events from RDS lock-free schema changes.

This is an experimental feature. We recommend that you take a savepoint of your Flink job before performing an online lock-free schema change to ensure you can recover if needed.

Note

Supported in Ververica Runtime (VVR) 11.0 and later versions.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip the backfill process during the snapshot phase.

No

BOOLEAN

false

Valid values:

  • true: Skips the backfill process during the snapshot phase.

  • false (default): Does not skip the backfill process.

If true, table changes that occur during the snapshot are read later in the incremental phase, not merged into the snapshot.

Important

Skipping backfill can lead to data inconsistencies because changes that occur during the snapshot phase might be replayed. This provides only at-least-once semantics.

Note

Supported in Ververica Runtime (VVR) 11.1 and later versions.

treat-tinyint1-as-boolean.enabled

Specifies whether to treat the TINYINT(1) type as a BOOLEAN type.

No

BOOLEAN

true

Valid values:

  • true (default): Treats the TINYINT(1) type as a BOOLEAN type.

  • false: Does not treat the TINYINT(1) type as a BOOLEAN type.

treat-timestamp-as-datetime-enabled

Specifies whether to treat the MySQL TIMESTAMP type as a DATETIME type.

No

BOOLEAN

false

Valid values:

  • true: Treats the MySQL TIMESTAMP type as a DATETIME type and maps it to the CDC TIMESTAMP type.

  • false (default): Maps the MySQL TIMESTAMP type to the CDC TIMESTAMP_LTZ type.

The MySQL TIMESTAMP type stores UTC time and is affected by the time zone, whereas the MySQL DATETIME type stores literal time and is not affected by the time zone.

When enabled, this parameter converts MySQL TIMESTAMP data to the DATETIME type based on the server-time-zone.

include-comments.enabled

Specifies whether to synchronize table and column comments.

No

BOOLEAN

false

Valid values:

  • true: Synchronizes table and column comments.

  • false (default): Does not synchronize table and column comments.

Enabling this parameter increases the memory usage of the job.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Specifies whether to distribute the unbounded chunk first during the snapshot phase.

No

BOOLEAN

false

Valid values:

  • true: Prioritizes the distribution of the unbounded chunk during the snapshot phase.

  • false (default): Does not prioritize the distribution of the unbounded chunk.

This is an experimental feature. Enabling it can reduce the risk of out-of-memory (OOM) errors on the TaskManager when it synchronizes the final chunk. We recommend that you enable this parameter before the first job startup.

Note

Supported in Ververica Runtime (VVR) 11.1 and later versions.

binlog.session.network.timeout

The network timeout for the binlog connection.

No

DURATION

10m

If set to 0s, the default timeout of the MySQL server is used.

Note

Supported in Ververica Runtime (VVR) 11.5 and later versions.

scan.rate-limit.records-per-second

The maximum number of records per second that the source can emit.

No

LONG

None

Use this to limit the data read rate. This limit applies to both the snapshot and incremental phases.

The numRecordsOutPerSecond metric of the source reflects the number of records output by the entire data stream per second, and you can adjust this parameter based on this metric.

During the snapshot read phase, you usually need to reduce the number of data rows read in each batch by decreasing the value of the scan.incremental.snapshot.chunk.size parameter.

Note

Supported in Ververica Runtime (VVR) 11.5 and later versions.

include-binlog-meta.enable

Specifies whether to include original MySQL binlog information, such as GTIDs and binlog offsets, in the message.

No

BOOLEAN

false

This is useful for raw binlog synchronization, such as replacing an existing Canal-based synchronization pipeline.

Note

Supported in Ververica Runtime (VVR) 11.6 and later versions.

scan.binlog.tolerate.gtid-holes

Allows the job to tolerate gaps in the GTID sequence by bypassing discontinuous events and continuing to run.

No

BOOLEAN

false

Before enabling this parameter, ensure the job's starting GTID offset has not expired. Starting from a purged GTID offset causes the engine to silently skip missing logs, resulting in data loss.

Note

Supported in Ververica Runtime (VVR) 11.6 and later versions.

Reuse an existing catalog

Starting with VVR 11.5, you can directly reference a built-in MySQL catalog from the Catalogs page in a Flink CDC data ingestion job, eliminating the need to manually configure connection properties.

source:
  type: mysql
  using.built-in-catalog: mysql_rds_catalog

Currently, data ingestion jobs automatically reuse the following parameters from the MySQL catalog:

  • hostname

  • port

  • username

  • password

  • catalog.table.metadata-columns

  • catalog.table.treat-tinyint1-as-boolean

To override any of these parameters, explicitly define them in your YAML configuration. Parameters defined in YAML take precedence.

Type mapping

The following table shows the type mappings for data ingestion.

MySQL CDC type

Flink CDC type

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

The mapping depends on the value of the treat-timestamp-as-datetime-enabled parameter:

true: TIMESTAMP[(p)]

false: TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

STRING

Note

The MySQL decimal data type supports a precision of up to 65, while Flink's DECIMAL type is limited to a precision of 38. If you define a decimal column with a precision greater than 38, map it to a string to prevent precision loss.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Note

Flink converts the JSON data type to a JSON-formatted string.

GEOMETRY

STRING

Note

Flink converts MySQL spatial data types to a fixed JSON string format. MySQL spatial data type mapping.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Note

The maximum supported length for MySQL BLOB data types is 2,147,483,647 (2^31 - 1).

BLOB

MEDIUMBLOB

LONGBLOB

Examples

  • CDC Source Table

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Dimension Table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Sink Table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Flink CDC Source

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

MySQL CDC source

  • How it works

    When a MySQL CDC source starts, it performs a full table scan, divides the table into chunks based on the primary key, and records the current Binlog offset. It then uses the incremental snapshot algorithm to read each chunk sequentially using SELECT statements. The job periodically performs checkpoints to record the completed chunks. If a failover occurs, the job resumes reading the unfinished chunks. After all chunks are read, the source switches to reading incremental changes from the previously recorded Binlog offset. The Flink job continues to perform checkpoints periodically to record the Binlog offset. This ensures that if a failover occurs, the job can resume from the last recorded offset, achieving exactly-once semantics.

    Incremental snapshot algorithm details: MySQL CDC Connector.

  • Metadata

    Metadata is useful for synchronizing and merging sharded databases and tables into a single sink. When you merge sharded tables, you often need to identify the original source database and table for each record. Metadata columns allow you to access this information, making it easy to consolidate multiple sharded tables into one sink table.

    The MySQL CDC source supports metadata columns, which you can use to access the following metadata.

    Metadata key

    Type

    Description

    database_name

    STRING NOT NULL

    The name of the database that contains the row.

    table_name

    STRING NOT NULL

    The name of the table that contains the row.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    The time at which the change occurred in the database. If the record is from the historical snapshot of the table instead of the Binlog, this value is 0.

    Note

    The precision of this field is in milliseconds.

    op_type

    STRING NOT NULL

    The type of change operation for the row.

    • +I: INSERT message

    • -D: DELETE message

    • -U: UPDATE_BEFORE message

    • +U: UPDATE_AFTER message

    Note

    Supported in VVR 8.0.7 and later.

    query_log

    STRING NOT NULL

    The MySQL query log record corresponding to the row.

    Note

    The binlog_rows_query_log_events parameter must be enabled in MySQL to record query logs.

    The following example merges multiple sharded orders tables from multiple databases in a MySQL instance into a single holo_orders table in Hologres.

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Read the database name.
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Read the table name.
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change timestamp.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read the change type.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Match multiple sharded databases with a regular expression.
      'table-name' = 'orders_.*'   -- Match multiple sharded tables with a regular expression.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    If you set the scan.read-changelog-as-append-only.enabled parameter to true in the WITH clause, the output depends on how the primary key of the sink table is configured:

    • If the primary key of the sink table is order_id, the output contains only the last change for each primary key from the source table. If the last change for a primary key was a delete operation, the sink table will contain a record with the same primary key and an op_type of -D.

    • If the primary key of the sink table is a composite key of order_id, operation_ts, and op_type, the output contains the complete change history for each primary key from the source table.

  • Regular expression support

    The MySQL CDC source supports using regular expressions in the table-name and database-name options to match multiple tables or databases. The following example shows how to specify multiple tables by using regular expressions.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Match multiple databases with a regular expression.
      'table-name' = '(t[5-8]|tt)' -- Match multiple tables with a regular expression.
    );

    The regular expressions in the preceding example are explained as follows:

    • ^(test).* is a prefix match example. This expression matches database names that start with test, such as test1 or test2.

    • .*[p$] is a suffix match example. This expression matches database names that end with p, such as cdcp or edcp.

    • txc is an exact match. It matches the database named txc.

    The MySQL CDC source uses the database-name.table-name pattern to match fully qualified table names. For example, the pattern (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) can match tables such as txc.tt and test2.test5 in the database.

    Important

    In SQL jobs, the table-name and database-name options do not support comma-separated lists for specifying multiple tables or databases.

    • To match multiple tables or use multiple regular expressions, enclose them in parentheses and separate them with a pipe character (|). For example, to read from the user and product tables, set the table-name option to (user|product).

    • If your regular expression contains a comma, you must rewrite it by using the pipe (|) operator. For example, rewrite the expression mytable_\d{1,2} as its equivalent (mytable_\d{1}|mytable_\d{2}) to avoid using a comma.

  • Manage parallelism

    The MySQL connector supports parallel reads of the full dataset, which improves data loading efficiency. When combined with the Autopilot feature, the job can automatically scale down resources during the incremental phase after the parallel read is complete, saving computing resources.

    In the Realtime Compute for Apache Flink console, you can set the job's parallelism on the Resource Configuration page in either Basic Mode or Expert Mode. The differences are as follows:

    • Basic Mode sets the parallelism for the entire job.Basic Mode

    • Expert Mode allows you to set the parallelism for a specific VERTEX on demand.Expert Mode vertex parallelism

    Resource configuration: Configure job deployment information.

    Important

    Regardless of the mode you use, the range of server-id values declared in the table must be greater than or equal to the job's parallelism. For example, if the server-id range is 5404-5412, there are 8 unique server IDs, so the job can have a maximum parallelism of 8. In addition, the server-id ranges for different jobs connecting to the same MySQL instance must not overlap. Each job requires a unique server-id range.

  • Autopilot automatic scaling

    The full snapshot phase accumulates a large volume of historical data. To improve reading efficiency, data is typically read in parallel. In contrast, during the incremental Binlog phase, data volume is smaller and global ordering must be preserved, so a single reader is usually sufficient. The Autopilot feature automatically balances performance and resources to meet these different requirements.

    Autopilot monitors the traffic of each MySQL CDC source task. When the job enters the Binlog phase, if only one task is actively reading from the Binlog while other tasks are idle, Autopilot automatically scales down the source's CU count and parallelism. To enable Autopilot, go to the job's O&M page and set the Autopilot mode to Active.

    Note

    The minimum trigger interval for scaling down is 24 hours by default. Configure Autopilot.

  • Startup modes

    Use the scan.startup.mode option to specify the startup mode for the MySQL CDC source. The available values are:

    • initial (Default): Performs a full snapshot of the database table on the first startup, and then switches to incremental mode to read the Binlog.

    • earliest-offset: Skips the snapshot phase and starts reading from the earliest available Binlog offset.

    • latest-offset: Skips the snapshot phase and starts reading from the end of the Binlog. In this mode, the source can only read data changes that occur after the job has started.

    • specific-offset: Skips the snapshot phase and starts reading from a specified Binlog offset. You can specify the offset using a Binlog filename and position, or a GTID set.

    • timestamp: Skips the snapshot phase and starts reading Binlog events from a specified timestamp.

    Example:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset.
        'scan.startup.mode' = 'latest-offset', -- Start from the latest offset.
        'scan.startup.mode' = 'specific-offset', -- Start from a specific offset.
        'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the Binlog filename in specific-offset mode.
        'scan.startup.specific-offset.pos' = '4', -- Specify the Binlog position in specific-offset mode.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set in specific-offset mode.
        'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp in timestamp mode.
        ...
    )
    Important
    • During each checkpoint, the MySQL source logs the current offset at the INFO level. The log prefix is Binlog offset on checkpoint {checkpoint-id}. You can use this log information to start a job from a specific checkpoint offset.

    • If the schema of a table has changed, starting from earliest-offset, specific-offset, or timestamp may cause errors. The underlying Debezium reader maintains the latest schema internally and cannot correctly parse earlier data that has a different schema.

  • About CDC source tables without a primary key

    • To use a table without a primary key, you must set the scan.incremental.snapshot.chunk.key-column option to a non-nullable column with unique values.

    • The processing semantics for a table without a primary key depend on whether the column specified by scan.incremental.snapshot.chunk.key-column is updated:

      • If the specified column is never updated, exactly-once semantics are guaranteed.

      • If the specified column is updated, only at-least-once semantics can be guaranteed. However, you can ensure data correctness by defining a primary key in the sink and using idempotent operations.

  • Read ApsaraDB RDS for MySQL backup logs

    The MySQL CDC source supports reading backup logs from ApsaraDB RDS for MySQL. This is useful when the full snapshot phase is lengthy, causing local Binlog files to be purged, while backup files (either automatically or manually uploaded) are still available.

    Example:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • Enable CDC source reuse

    Multiple MySQL CDC source tables in a single job each start a separate Binlog client, increasing database load on the same instance. MySQL CDC FAQ.

    Solution

    VVR versions 8.0.7 and later support MySQL CDC Source reuse. The reuse feature merges compatible MySQL CDC source tables. A merge occurs when the source tables have the same configuration options, except for the database name, table name, and server-id. The engine automatically merges MySQL CDC sources within the same job.

    Procedure

    1. Use the SET command in an SQL job:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # For VVR 8.0.8 and 8.0.9, also set this option:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      Source reuse is enabled by default in VVR 11.1 and later.
    2. Start the job without state. Modifying the Source reuse configuration option changes the job topology. You must start the job without state. Otherwise, the job may fail to start or lose data. If sources are merged, you will see a MergetableSourceScan node.

    Important
    • We recommend that you do not disable operator chaining after you enable reuse. Setting pipeline.operator-chaining to false increases the overhead of data serialization and deserialization. The more Sources that are merged, the higher the overhead.

    • In VVR 8.0.7, disabling operator chaining causes serialization issues.

Accelerate binlog reading

When used as a source, the MySQL connector parses the binlog during the incremental phase to generate change events. The binlog records all table changes in a binary format. The following methods accelerate binlog parsing:

  • Enable parsing filters

    • Use the scan.only.deserialize.captured.tables.changelog.enabled option to parse change events only for specified tables.

  • Tune Debezium parameters

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Specifies the maximum number of records in the blocking queue. This queue holds events that Debezium reads from the database before writing them downstream. Default: 8192.

    • debezium.max.batch.size: Specifies the maximum number of events the connector processes per iteration. Default: 2048.

    • debezium.poll.interval.ms: Specifies the interval in milliseconds that the connector waits between requests for new change events. Default: 1000 (1 second).

Example:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium configuration
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Enable parsing filters
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Parse change events for specified tables only.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium configuration
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Enable parsing filters
  scan.only.deserialize.captured.tables.changelog.enabled: true

The fully managed MySQL connector can consume the binlog at a rate of up to 85 MB/s, which is approximately twice the throughput of the open-source Flink connector for MySQL. If the binlog generation rate exceeds 85 MB/s (equivalent to one 512 MB file every 6 seconds), the Flink job's latency will continuously increase. The latency will decrease once the generation rate drops. A large transaction in the binlog can cause a temporary spike in processing latency. This spike subsides after the transaction is fully read.

MySQL CDC DataStream API

Important

To read and write data with the DataStream API, use the corresponding DataStream connector. For setup instructions, see DataStream connector usage.

The following example demonstrates how to use MySqlSource in a DataStream API program and lists the required POM dependencies.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // Set the database to capture
        .tableList("yourDatabaseName.yourTableName") // Set the table to capture
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

To build the MySqlSource, specify the following parameters:

Parameter

Description

hostname

The IP address or hostname of the MySQL database.

port

The port number of the MySQL database.

databaseList

The names of the databases to capture.

Note

The database name supports regular expressions to read data from multiple databases. You can use .* to match all databases.

username

The username for the MySQL database.

password

The password for the MySQL database.

deserializer

A deserializer that converts SourceRecord records to a specific type. Valid values include:

  • RowDataDebeziumDeserializeSchema: Converts SourceRecord to Flink's internal RowData data structure, which is used by the Table and SQL APIs.

  • JsonDebeziumDeserializationSchema: Converts SourceRecord to a JSON-formatted string.

Specify the following placeholders in your POM dependencies:

${vvr.version}

The engine version of Realtime Compute for Apache Flink, such as 1.17-vvr-8.0.4-3.

Note

Use the version number from Maven, as hotfixes may be released without a separate announcement.

${flink.version}

Apache Flink version, for example: 1.17.2.

Important

Use the Apache Flink version that corresponds to your Realtime Compute for Apache Flink engine version to avoid incompatibility. Version compatibility: Engines.

FAQ

For issues with CDC source tables, see CDC issues.

Flink CDC principles and enterprise features

  • Flink CDC Enterprise Features

  • Flink CDC Technical Principles