Synchronize PolarDB for MySQL to Kafka

更新时间:
复制 MD 格式

Learn how to use Data Transmission Service (DTS) to synchronize data from a PolarDB for MySQL cluster to a Message Queue for Apache Kafka instance to expand your message processing capabilities.

Prerequisites

  • You have created a destination ApsaraMQ for Kafka instance. For more information, see Overview.

    Note

    For information about the versions supported by the source and destination, see Overview of data synchronization solutions.

  • You have created a topic in the destination ApsaraMQ for Kafka instance to receive synchronized data. For more information, see Step 1: Create a topic.

Limitations

Note

DTS does not synchronize foreign keys from the source database to the destination database. Therefore, cascading operations on the source database are not synchronized to the destination database.

Type

Description

Source database limits

  • The tables to be synchronized must have a primary key or a UNIQUE constraint, and the fields must be unique. Otherwise, duplicate data may appear in the destination database.

  • If you synchronize data at the table level and need to edit the tables, such as mapping column names, a single data synchronization task supports a maximum of 1,000 tables. If you exceed this limit, an error is reported when you submit the task. In this case, split the tables into multiple synchronization tasks or configure a task to synchronize the entire database.

  • Binary logs:

    • You must enable binary logging and set the loose_polar_log_bin parameter to ON. Otherwise, an error is reported during the precheck and the DTS instance fails to start. For more information about how to enable binary logging and modify parameters, see Enable binary logging and Set cluster and node parameters.

      Note

      Enabling binary logging for a PolarDB for MySQL cluster consumes storage space and incurs fees.

    • The binary logs of the PolarDB for MySQL cluster must be retained for at least 3 days. We recommend that you retain them for 7 days. Otherwise, the DTS task may fail because DTS cannot obtain the binary logs. In extreme cases, this may cause data inconsistency or data loss. Issues caused by a binary log retention period shorter than the required period are not covered by the DTS Service-Level Agreement (SLA).

      Note

      For more information about how to set the retention period for binary logs of a PolarDB for MySQL cluster, see Modify the retention period.

  • Do not run DDL operations that change database or table schemas during schema synchronization or full synchronization. Otherwise, the synchronization task fails.

    Note

    During full synchronization, DTS queries the source database. This creates metadata locks that may block DDL operations on the source database.

Other limits

  • You cannot synchronize data from a read-only node of the source PolarDB for MySQL cluster.

  • You cannot synchronize OSS foreign tables from the source PolarDB for MySQL cluster.

  • Synchronization of INDEX, PARTITION, VIEW, PROCEDURE, FUNCTION, TRIGGER, and FK is not supported.

  • Primary/secondary failover of the database instance is not supported during initial full data synchronization. If a failover occurs, reconfigure the synchronization task promptly.

  • Before you synchronize data, evaluate the performance of the source and destination databases. We recommend that you synchronize data during off-peak hours. Otherwise, initial full data synchronization consumes read and write resources on the source and destination databases, which may increase the database load.

  • Initial full data synchronization runs concurrent INSERT operations, which causes fragmentation in the destination database tables. As a result, the tablespace of the destination instance is larger than that of the source instance after initial full data synchronization is complete.

  • For table-level data synchronization, do not use tools such as pt-online-schema-change to perform online DDL operations on the synchronization objects in the source database. Otherwise, the synchronization fails.

  • For table-level data synchronization, if no data other than the data from DTS is written to the destination database, you can use Data Management (DMS) to perform online DDL operations. For more information, see Change schemas without locking tables.

  • During DTS synchronization, do not write data other than the data from DTS to the destination database. Otherwise, data inconsistency between the source and destination databases may occur. For example, if you use DMS to perform online DDL operations while other data is being written to the destination database, data may be lost in the destination database.

  • During synchronization, if the destination Kafka cluster is scaled out or scaled in, you must restart the instance.

  • If a task fails, DTS support staff will attempt to restore it within eight hours. During restoration, they may restart the task or adjust its parameters.

    Note

    Only DTS task parameters are modified—not database parameters. Parameters that may be adjusted include those listed in Modify instance parameters.

Other notes

DTS periodically runs the CREATE DATABASE IF NOT EXISTS `test` command on the source database to advance the binary log offset.

Single record size limit

The maximum size for a single record written to Kafka is 10 MB. If a source data row exceeds this limit, DTS fails to write the record to Kafka, which interrupts the task. In this scenario, we recommend that you do not synchronize the table. If you must synchronize it, configure the DTS task to exclude the columns that contain large fields. For a running task, modify the synchronization objects: remove the table, re-add it, and then reconfigure the task to exclude the columns that contain large fields.

Supported synchronization topologies

  • One-way one-to-one synchronization.

  • One-way one-to-many synchronization.

  • One-way many-to-one synchronization.

  • One-way cascade synchronization.

For more details and important usage notes, see Introduction to data synchronization topologies.

Supported SQL operations

Operation type

SQL statement

DML

INSERT, UPDATE, and DELETE

DDL

  • CREATE TABLE, ALTER TABLE, DROP TABLE, RENAME TABLE, and TRUNCATE TABLE

  • CREATE VIEW, ALTER VIEW, and DROP VIEW

  • CREATE PROCEDURE, ALTER PROCEDURE, and DROP PROCEDURE

  • CREATE FUNCTION, DROP FUNCTION, CREATE TRIGGER, and DROP TRIGGER

  • CREATE INDEX and DROP INDEX

Database account permissions

Database

Permissions

Source PolarDB for MySQL cluster

Read permissions on objects to be synchronized.

Procedure

  1. Go to the data synchronization task list page in the destination region. You can do this in one of two ways.

    DTS console

    1. Log on to the DTS console.

    2. In the navigation pane on the left, click Data Synchronization.

    3. In the upper-left corner of the page, select the region where the synchronization instance is located.

    DMS console

    Note

    The actual steps may vary depending on the mode and layout of the DMS console. For more information, see Simple mode console and Customize the layout and style of the DMS console.

    1. Log on to the DMS console.

    2. In the top menu bar, choose Data + AI > DTS (DTS) > Data Synchronization.

    3. To the right of Data Synchronization Tasks, select the region of the synchronization instance.

  2. Click Create Task to navigate to the task configuration page.

  3. Configure the source and destination databases.

    Warning

    After you select the source and destination instances, review the Limits at the top of the page. Otherwise, the task may fail or data inconsistency may occur.

    Category

    Parameter

    Description

    N/A

    Task Name

    DTS automatically generates a task name. We recommend that you specify a descriptive name for easy identification. The name does not need to be unique.

    Source Database

    Select Existing Connection

    • Select the registered database instance with DTS from the drop-down list. The database information below is automatically configured.

      Note

      In the DMS console, this configuration item is Select a DMS database instance.

    • If you have not registered the database instance or do not need to use a registered instance, manually configure the database information below.

    Database Type

    Select PolarDB for MySQL.

    Connection Type

    Select Alibaba Cloud Instance.

    Instance Region

    Select the region where the source PolarDB for MySQL cluster resides.

    Cross-account

    In this scenario, data is synchronized within the same Alibaba Cloud account. Select No.

    PolarDB Cluster ID

    Select the ID of the source PolarDB for MySQL cluster.

    Database Account

    Enter the database account of the source PolarDB for MySQL cluster. For the required permissions, see Permissions required for the database account.

    Database Password

    Enter the password for the specified database account.

    Connection method

    Select Non-encrypted or SSL-encrypted as needed. If you set this to SSL-encrypted, you must enable SSL encryption for the RDS for MySQL instance beforehand. For more information, see Use a cloud certificate to quickly enable SSL link encryption.

    Encryption

    Select a method based on your actual situation. For more information about SSL encryption, see Configure SSL encryption.

    Destination Database

    Select Existing Connection

    • Select the registered database instance with DTS from the drop-down list. The database information below is automatically configured.

      Note

      In the DMS console, this configuration item is Select a DMS database instance.

    • If you have not registered the database instance or do not need to use a registered instance, manually configure the database information below.

    Database Type

    Select Kafka.

    Connection Type

    Select Express Connect, VPN Gateway, or Smart Access Gateway.

    Note

    In this step, the Message Queue for Apache Kafka instance is configured as a self-managed Kafka database.

    Instance Region

    Select the region where the destination Message Queue for Apache Kafka instance resides.

    Connected VPC

    Select the ID of the Virtual Private Cloud (VPC) to which the destination Message Queue for Apache Kafka instance belongs. You can find the VPC ID on the Basic Information page of the Kafka instance.

    Hostname or IP address

    Enter an IP address from the Default Endpoint of the Message Queue for Apache Kafka instance.

    Note

    You can obtain the IP address for the Default Endpoint on the Basic Information page of the Message Queue for Apache Kafka instance.

    Port Number

    The service port of the Message Queue for Apache Kafka instance. The default value is 9092.

    Database Account

    Enter the database account for the destination Message Queue for Apache Kafka instance.

    Note

    If the instance type of the Message Queue for Apache Kafka instance is VPC instance, you do not need to configure Database Account and Database Password.

    Database Password

    Enter the password for the specified database account.

    Kafka Version

    Select the version that matches your Kafka instance.

    Connection method

    Select Non-encrypted or SCRAM-SHA-256 based on your business and security requirements.

    Topic

    From the drop-down list, select the topic to receive the synchronized data.

    Use Kafka Schema Registry

    Kafka Schema Registry is a metadata service layer with a RESTful API for storing and retrieving Avro schemas.

    • No: Do not use Kafka Schema Registry.

    • Yes: Use Kafka Schema Registry. You need to enter the URL or IP address registered for the Avro schema in Kafka Schema Registry.

  4. After completing the configuration, click Test Connectivity and Proceed at the bottom of the page.

    Note
    • Ensure that you add the CIDR blocks of the DTS servers (either automatically or manually) to the security settings of both the source and destination databases to allow access. For more information, see Add the IP address whitelist of DTS servers.

    • If the source or destination is a self-managed database (i.e., the Access Method is not Alibaba Cloud Instance), you must also click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.

  5. Configure the task objects.

    1. On the Configure Objects page, specify the objects to synchronize.

      Parameter

      Description

      Synchronization Type

      DTS always selects Incremental Data Synchronization. By default, you must also select Schema Synchronization and Full Data Synchronization. After the precheck, DTS initializes the destination cluster with the full data of the selected source objects, which serves as the baseline for subsequent incremental synchronization.

      Note

      If the Access Method for the destination Kafka instance is Alibaba Cloud Instance, Schema Synchronization is not supported.

      Processing Mode for Existing Destination Tables

      • Precheck and Report Errors: Checks for tables with the same names in the destination database. If any tables with the same names are found, an error is reported during the precheck and the data synchronization task does not start. Otherwise, the precheck is successful.

        Note

        If you cannot delete or rename the table with the same name in the destination database, you can map it to a different name in the destination. For more information, see Database Table Column Name Mapping.

      • Ignore Errors and Proceed: Skips the check for tables with the same name in the destination database.

        Warning

        Selecting Ignore Errors and Proceed may cause data inconsistency and put your business at risk. For example:

        • If the table schemas are consistent and a record in the destination database has the same primary key or unique key value as a record in the source database:

          • During full data synchronization, DTS retains the destination record and skips the source record.

          • During incremental synchronization, DTS overwrites the destination record with the source record.

        • If the table schemas are inconsistent, data initialization may fail. This can result in only partial data synchronization or a complete synchronization failure. Use with caution.

      Data Format in Kafka

      Select the data storage format for the Kafka instance based on your requirements.

      Kafka Data Compression Format

      Select a compression format for Kafka messages.

      • LZ4 (Default): Offers a low compression ratio and a high compression speed.

      • GZIP: Offers a high compression ratio and a low compression speed.

        Note

        This format has high CPU consumption.

      • Snappy: Offers a medium compression ratio and a medium compression speed.

      Policy for Shipping Data to Kafka Partitions

      Select a policy based on your business requirements.

      Message acknowledgement mechanism

      Select a message acknowledgment mechanism based on your business requirements.

      Topic That Stores DDL Information

      Select a topic to store DDL information.

      Note

      If you do not select a topic, DTS stores DDL information by default in the same topic that receives data.

      Case Policy for Destination Object Names

      Configure the case-sensitivity policy for database, table, and column names in the destination instance. By default, the DTS default policy is selected. You can also choose to use the default policy of the source or destination database. For more information, see Case policy for destination object names.

      Source Objects

      In the Source Objects box, click the objects, and then click 向右 to move them to the Selected Objects box.

      Note

      You can select objects to synchronize only at the table level.

      Selected Objects

      No additional configuration is required in this example. You can use the mapping feature to set the topic name, number of partitions, and partition key for the source table in the destination Kafka instance. For more information, see Mapping information.

      Note
      • To select the SQL operations to synchronize at the schema or table level, right-click an object in the Selected Objects pane and select the desired SQL operations in the dialog box that appears.

      • If you use object name mapping, the synchronization of dependent objects may fail.

    2. Click Next: Advanced Settings.

      Parameter

      Description

      Dedicated Cluster for Task Scheduling

      By default, DTS uses a shared cluster for tasks, so you do not need to make a selection. For greater task stability, you can purchase a dedicated cluster to run the DTS synchronization task. For more information, see What is a DTS dedicated cluster?.

      Retry Time for Failed Connections

      If the connection to the source or destination database fails after the synchronization task starts, DTS reports an error and immediately begins to retry the connection. The default retry duration is 720 minutes. You can customize the retry time to a value from 10 to 1,440 minutes. We recommend a duration of 30 minutes or more. If the connection is restored within this period, the task resumes automatically. Otherwise, the task fails.

      Note
      • If multiple DTS instances (e.g., Instance A and B) share a source or destination, DTS uses the shortest configured retry duration (e.g., 30 minutes for A, 60 for B, so 30 minutes is used) for all instances.

      • DTS charges for task runtime during connection retries. Set a custom duration based on your business needs, or release the DTS instance promptly after you release the source/destination instances.

      Retry Time for Other Issues

      If a non-connection issue (e.g., a DDL or DML execution error) occurs, DTS reports an error and immediately retries the operation. The default retry duration is 10 minutes. You can also customize the retry time to a value from 1 to 1,440 minutes. We recommend a duration of 10 minutes or more. If the related operations succeed within the set retry time, the synchronization task automatically resumes. Otherwise, the task fails.

      Important

      The value of Retry Time for Other Issues must be less than that of Retry Time for Failed Connections.

      Enable Throttling for Full Data Synchronization

      During full data synchronization, DTS consumes read and write resources from the source and destination databases, which can increase their load. To mitigate pressure on the destination database, you can limit the migration rate by setting Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s).

      Note

      Enable Throttling for Incremental Data Synchronization

      You can also limit the incremental synchronization rate to reduce pressure on the destination database by setting RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s).

      Whether to delete SQL operations on heartbeat tables of forward and reverse tasks

      Choose whether DTS writes heartbeat SQL information to the source database while the instance is running.

      • Yes: Does not write heartbeat SQL information to the source database. The DTS instance may display latency.

      • No: Writes heartbeat SQL information to the source database. This may interfere with source database operations like physical backups and cloning.

      Environment Tag

      You can select an environment tag to identify the instance. No selection is required for this example.

      Configure ETL

      Choose whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:

      Monitoring and Alerting

      Choose whether to set up alerts. If the synchronization fails or the latency exceeds the specified threshold, DTS sends a notification to the alert contacts.

  6. Save the task and perform a precheck.

    • To view the parameters for configuring this instance via an API operation, hover over the Next: Save Task Settings and Precheck button and click Preview OpenAPI parameters in the tooltip.

    • If you have finished viewing the API parameters, click Next: Save Task Settings and Precheck at the bottom of the page.

    Note
    • Before a synchronization task starts, DTS performs a precheck. You can start the task only if the precheck passes.

    • If the precheck fails, click View Details next to the failed item, fix the issue as prompted, and then rerun the precheck.

    • If the precheck generates warnings:

      • For non-ignorable warning, click View Details next to the item, fix the issue as prompted, and run the precheck again.

      • For ignorable warnings, you can bypass them by clicking Confirm Alert Details, then Ignore, and then OK. Finally, click Precheck Again to skip the warning and run the precheck again. Ignoring precheck warnings may lead to data inconsistencies and other business risks. Proceed with caution.

  7. Purchase an instance.

    1. When the Success Rate reaches 100%, click Next: Purchase Instance.

    2. On the Purchase page, select the billing method and link specifications for the data synchronization instance. For more information, see the following table.

      Category

      Parameter

      Description

      New Instance Class

      Billing Method

      • Subscription: You pay upfront for a specific duration. This is cost-effective for long-term, continuous tasks.

      • Pay-as-you-go: You are billed hourly for actual usage. This is ideal for short-term or test tasks, as you can release the instance at any time to save costs.

      Resource Group Settings

      The resource group to which the instance belongs. The default is default resource group. For more information, see What is Resource Management?.

      Instance Class

      DTS offers synchronization specifications at different performance levels that affect the synchronization rate. Select a specification based on your business requirements. For more information, see Data synchronization link specifications.

      Subscription Duration

      In subscription mode, select the duration and quantity of the instance. Monthly options range from 1 to 9 months. Yearly options include 1, 2, 3, or 5 years.

      Note

      This option appears only when the billing method is Subscription.

    3. Read and select the checkbox for Data Transmission Service (Pay-as-you-go) Service Terms.

    4. Click Buy and Start, and then click OK in the OK dialog box.

      You can monitor the task progress on the data synchronization page.

Mapping information

  1. In the Selected Objects section, hover over the target topic name (at the table level).

  2. Click Edit next to the topic name.

  3. In the Edit Table dialog box, configure the mapping information.

    Note
    • At the schema level, the Edit Schema dialog box appears and offers fewer parameters. At the table level, the Edit Table dialog box appears.

    • If you are not synchronizing an entire database, you cannot modify Name of target Topic or Number of Partitions in the Edit Schema dialog box.

    Parameter

    Description

    Name of target Topic

    The name of the destination topic for the source table. By default, DTS uses the Topic selected for the Destination Database during the Configurations for Source and Destination Databases step.

    Important
    • If the destination is a Message Queue for Apache Kafka instance, the topic name that you specify must already exist in the destination Kafka instance. Otherwise, data synchronization fails. If the destination is a self-managed Kafka cluster and the data synchronization instance includes schema synchronization, DTS attempts to create the topic that you specify in the destination cluster.

    • If you change the Name of target Topic, DTS writes the data to the topic that you specify.

    Filter Conditions

    For more information, see Set filter conditions.

    Number of Partitions

    The number of partitions in the destination topic.

    Partition Key

    If you set the Policy for Shipping Data to Kafka Partitions to Ship Data to Separate Partitions Based on Hash Values of Primary Keys, you can specify one or more columns as the partition key. DTS calculates a hash value based on the specified key and uses it to distribute rows among the partitions of the destination topic.

    Note

    You can configure Partition Key only in the Edit Table dialog box.

  4. Click OK.

FAQ