Subscribe to binary logs by using Flink

更新时间:
复制 MD 格式

Realtime Compute for Apache Flink subscribes to the AnalyticDB for MySQL of an AnalyticDB for MySQL cluster to capture and process database changes in real time. This enables efficient data synchronization and stream computing.

Prerequisites

  • Your AnalyticDB for MySQL cluster must be one of the following editions: Enterprise Edition, Basic Edition, Data Lakehouse Edition, or Data Warehouse Edition (in elastic mode).

  • The AnalyticDB for MySQL cluster must meet the following kernel version requirements:

    • xuanwu_v1 table engine: 3.2.1.0 or later.

    • xuanwu_v2 table engine: 3.2.5.0 or later.

    Note

    To view and update the minor version, go to the Configuration Information section on the Cluster Information page in the AnalyticDB for MySQL console.

  • The Flink workspace must use Ververica Runtime (VVR) 8.0.4 or a later version.

  • The AnalyticDB for MySQL cluster and the fully managed Flink workspace must be in the same VPC.

  • Add the CIDR block of the Flink workspace to the AnalyticDB for MySQL whitelist.

Limitations

  • Flink can process only basic data types and the JSON complex data type from the AnalyticDB for MySQL binary log.

  • Flink ignores DDL operations and automatic partition deletion records for partitioned tables from the AnalyticDB for MySQL binary log.

Step 1: Enable binary logging

  1. Enable binary logging. This example uses a table named source_table.

    Note

    AnalyticDB for MySQL supports enabling binary logging only at the table level.

    When creating a table

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    For an existing table

    ALTER TABLE source_table BINLOG=true;
  2. (Optional) Modify the binary log retention period.

    You can modify the binlog_ttl parameter to adjust the binary log retention period. The default value of this parameter is 6 hours. For example, you can set the binary log retention period for the source_table table to 1 day.

    ALTER TABLE source_table binlog_ttl='1d';

    The binlog_ttl parameter supports the following formats:

    • Milliseconds: A numeric value. Example: 60 represents 60 milliseconds.

    • Seconds: Number + s. Example: 30s represents 30 seconds.

    • Hour: A number followed by h. For example, 2h represents 2 hours.

    • Day: A number followed by d. Example: 1d represents 1 day.

    Note
    • The maximum binary log retention period is 365 days for clusters with the following kernel versions or later versions in their respective series: 3.2.1.9, 3.2.2.14, 3.2.3.8, 3.2.4.4, and 3.2.5.1. For clusters with earlier kernel versions, the maximum retention period is 21 days.

    • We recommend that you set the binary log retention period to a value no less than the default value of the binlog_ttl parameter. If the retention period is too short, files may be purged, which can affect data synchronization.

    • If you need to view the current binary log retention period, run SHOW CREATE TABLE source_table;.

Step 2: Upload the AnalyticDB for MySQL connector

  1. Download the connector JAR file: flink-sql-connector-adb-mysql-cdc-2.4-20260420.jar.

  2. Log on to the Realtime Compute for Apache Flink console.

  3. On the Flink tab, find the target workspace and click Console in the Actions column.

  4. In the left-side navigation pane, click Connectors.

  5. On the Connectors page, click Create Custom Connector.

  6. Upload the downloaded connector and click Next.

  7. Click Finish. The created custom connector appears in the list of connectors.

Step 3: Subscribe to the binary log

  1. Log on to the Realtime Compute for Apache Flink console and create an SQL job.

  2. Create a source table that connects to AnalyticDB for MySQL and reads binary log data from a specific table (source_table).

    Note
    • The primary key defined in the Flink DDL must match the primary key in the physical table in the AnalyticDB for MySQL cluster. This includes both the key columns and the primary key name. Otherwise, the data may be incorrect.

    • The Flink data types must be compatible with those in AnalyticDB for MySQL. For details about the data type mappings, see Type mapping.

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    The following table describes the WITH parameters.

    Parameter

    Required

    Default

    Type

    Description

    connector

    Yes

    None

    STRING

    The connector to use.

    This is a custom connector. Set this parameter to adb-mysql-cdc.

    hostname

    Yes

    None

    STRING

    The AnalyticDB for MySQL endpoint of the AnalyticDB for MySQL cluster.

    username

    Yes

    None

    STRING

    The database account for the AnalyticDB for MySQL cluster.

    password

    Yes

    None

    STRING

    The password for the AnalyticDB for MySQL database account.

    database-name

    Yes

    None

    STRING

    The name of the AnalyticDB for MySQL database.

    Because AnalyticDB for MySQL implements table-level binary logging, you can specify only one database.

    table-name

    Yes

    None

    STRING

    The name of the table in the AnalyticDB for MySQL database.

    Because AnalyticDB for MySQL implements table-level binary logging, you can specify only one table.

    port

    No

    3306

    INTEGER

    The port number.

    scan.incremental.snapshot.enabled

    No

    true

    BOOLEAN

    Specifies whether to enable the incremental snapshot reading mechanism.

    This feature is enabled by default. An incremental snapshot is a new mechanism for reading table snapshots. Compared with the previous snapshot mechanism, incremental snapshots have the following benefits:

    • The source supports concurrent reading during snapshot reading.

    • The source supports chunk-level checkpoints during snapshot reading.

    • The source does not need to obtain database lock permissions before reading a snapshot.

    scan.incremental.snapshot.chunk.size

    No

    8096

    INTEGER

    The number of rows per chunk for a table snapshot.

    scan.snapshot.fetch.size

    No

    1024

    INTEGER

    The maximum number of rows to fetch at a time when reading a table snapshot.

    scan.startup.mode

    No

    initial

    STRING

    The startup mode for data consumption.

    Valid values:

    • initial (default): Performs an initial snapshot of the table and then reads the latest binary log.

    • earliest-offset: Skips the snapshot phase and starts reading from the earliest available binary log.

    • specific-offset: Skips the snapshot phase and starts from a specified binary log position. Specify the binary log filename and offset by setting both the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters.

    • latest-offset: Skips the snapshot phase and reads only changes that occur after the connector starts.

    • timestamp: Skips the snapshot phase and starts reading from a specified timestamp. Set the timestamp in milliseconds (ms) by using the scan.startup.timestamp-millis parameter.

    Important

    If you use the earliest-offset, specific-offset, or timestamp startup mode, ensure that the table schema remains unchanged between the specified starting position and the job startup. Otherwise, the job may fail due to schema changes.

    scan.startup.specific-offset.file

    No

    None

    STRING

    In specific-offset startup mode, the binary log filename at the startup position.

    To obtain the latest binary log filename, run the SHOW MASTER STATUS for table_name; statement.

    scan.startup.specific-offset.pos

    No

    None

    LONG

    In specific-offset startup mode, the position in the binary log file at the startup position.

    To obtain the latest binary log position, run the SHOW MASTER STATUS for table_name; statement.

    scan.startup.specific-offset.skip-events

    No

    None

    LONG

    The number of events to skip after the specified startup position.

    scan.startup.specific-offset.skip-rows

    No

    None

    LONG

    The number of data rows to skip after the specified startup position.

    scan.startup.timestamp-millis

    No

    None

    LONG

    The millisecond timestamp of the startup position when using the timestamp startup mode.

    When you use this parameter, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds (ms).

    server-time-zone

    No

    System default

    STRING

    The session time zone on the database server, such as "Asia/Shanghai".

    It controls how AnalyticDB for MySQLAnalyticDB for MySQL is converted to the STRING data type. If this parameter is not set, ZONELD.SYSTEMDEFAULT() is used to determine the server time zone.

    debezium.min.row.count.to.stream.result

    No

    1000

    INTEGER

    If the number of rows in a table is greater than this value, the connector streams the results.

    If you set this parameter to 0, all table size checks are skipped, and all results are streamed during the snapshot phase.

    connect.timeout

    No

    30s

    DURATION

    The maximum time to wait for a database connection before the attempt times out.

    The default unit is seconds (s).

    connect.max-retries

    No

    3

    INTEGER

    The maximum number of retries after a database connection failure.

  3. Create a physical table in the destination database to store the processed data. This topic uses AnalyticDB for MySQL as the destination. For connectors supported by Flink, see Supported connectors.

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. Create a sink table that connects to the table created in the previous step. The sink table writes processed data to the specified table in AnalyticDB for MySQL.

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    For more information about the WITH parameters and type mappings for the sink table, see AnalyticDB for MySQL V3.0 connector.

  5. Use an INSERT INTO statement to send data from the source table to the sink table.

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. Click Save.

  7. Click Validate.

    The validation feature checks the SQL semantics, network connectivity, and metadata of the tables that are used by the job. You can also click SQL Advice in the result area to view SQL risk prompts and optimization suggestions.

  8. (Optional) Click Debug.

    You can use the job debugging feature to simulate job runs, check output results, verify the business logic of SELECT or INSERT statements, improve development efficiency, and reduce data quality risks.

  9. Click Deploy.

    After you develop and validate the job, deploy it to the production environment. Then, go to the O&M page and start the job.

  10. (Optional) View binary log information.

    Note

    The following statements return 0 if you have enabled binary logging but have not yet subscribed using DTS. Binary log information appears only after a successful subscription is established.

    • To obtain the file name and position of the latest binary log entry, run the following SQL statement:

      SHOW MASTER STATUS FOR source_table;
    • To view all unpurged binary log files and their sizes, run the following SQL statement:

      SHOW BINARY LOGS FOR source_table;

Type mapping

The following table maps AnalyticDB for MySQL data types to their Flink equivalents.

AnalyticDB for MySQL type

Flink type

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s) or NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING