Learn how to use Data Transmission Service (DTS) to synchronize data from a PolarDB for MySQL cluster to a Message Queue for Apache Kafka instance to expand your message processing capabilities.
Prerequisites
-
You have created a destination ApsaraMQ for Kafka instance. For more information, see Overview.
NoteFor information about the versions supported by the source and destination, see Overview of data synchronization solutions.
-
You have created a topic in the destination ApsaraMQ for Kafka instance to receive synchronized data. For more information, see Step 1: Create a topic.
Limitations
DTS does not synchronize foreign keys from the source database to the destination database. Therefore, cascading operations on the source database are not synchronized to the destination database.
|
Type |
Description |
|
Source database limits |
|
|
Other limits |
|
|
Other notes |
DTS periodically runs the CREATE DATABASE IF NOT EXISTS `test` command on the source database to advance the binary log offset. |
Single record size limit
The maximum size for a single record written to Kafka is 10 MB. If a source data row exceeds this limit, DTS fails to write the record to Kafka, which interrupts the task. In this scenario, we recommend that you do not synchronize the table. If you must synchronize it, configure the DTS task to exclude the columns that contain large fields. For a running task, modify the synchronization objects: remove the table, re-add it, and then reconfigure the task to exclude the columns that contain large fields.
Supported synchronization topologies
-
One-way one-to-one synchronization.
-
One-way one-to-many synchronization.
-
One-way many-to-one synchronization.
-
One-way cascade synchronization.
For more details and important usage notes, see Introduction to data synchronization topologies.
Supported SQL operations
|
Operation type |
SQL statement |
|
DML |
INSERT, UPDATE, and DELETE |
|
DDL |
|
Database account permissions
|
Database |
Permissions |
|
Source PolarDB for MySQL cluster |
Read permissions on objects to be synchronized. |
Procedure
Go to the data synchronization task list page in the destination region. You can do this in one of two ways.
DTS console
Log on to the DTS console.
In the navigation pane on the left, click Data Synchronization.
In the upper-left corner of the page, select the region where the synchronization instance is located.
DMS console
NoteThe actual steps may vary depending on the mode and layout of the DMS console. For more information, see Simple mode console and Customize the layout and style of the DMS console.
Log on to the DMS console.
In the top menu bar, choose .
To the right of Data Synchronization Tasks, select the region of the synchronization instance.
-
Click Create Task to navigate to the task configuration page.
-
Configure the source and destination databases.
WarningAfter you select the source and destination instances, review the Limits at the top of the page. Otherwise, the task may fail or data inconsistency may occur.
Category
Parameter
Description
N/A
Task Name
DTS automatically generates a task name. We recommend that you specify a descriptive name for easy identification. The name does not need to be unique.
Source Database
Select Existing Connection
Select the registered database instance with DTS from the drop-down list. The database information below is automatically configured.
NoteIn the DMS console, this configuration item is Select a DMS database instance.
If you have not registered the database instance or do not need to use a registered instance, manually configure the database information below.
Database Type
Select PolarDB for MySQL.
Connection Type
Select Alibaba Cloud Instance.
Instance Region
Select the region where the source PolarDB for MySQL cluster resides.
Cross-account
In this scenario, data is synchronized within the same Alibaba Cloud account. Select No.
PolarDB Cluster ID
Select the ID of the source PolarDB for MySQL cluster.
Database Account
Enter the database account of the source PolarDB for MySQL cluster. For the required permissions, see Permissions required for the database account.
Database Password
Enter the password for the specified database account.
Connection method
Select Non-encrypted or SSL-encrypted as needed. If you set this to SSL-encrypted, you must enable SSL encryption for the RDS for MySQL instance beforehand. For more information, see Use a cloud certificate to quickly enable SSL link encryption.
Encryption
Select a method based on your actual situation. For more information about SSL encryption, see Configure SSL encryption.
Destination Database
Select Existing Connection
Select the registered database instance with DTS from the drop-down list. The database information below is automatically configured.
NoteIn the DMS console, this configuration item is Select a DMS database instance.
If you have not registered the database instance or do not need to use a registered instance, manually configure the database information below.
Database Type
Select Kafka.
Connection Type
Select Express Connect, VPN Gateway, or Smart Access Gateway.
NoteIn this step, the Message Queue for Apache Kafka instance is configured as a self-managed Kafka database.
Instance Region
Select the region where the destination Message Queue for Apache Kafka instance resides.
Connected VPC
Select the ID of the Virtual Private Cloud (VPC) to which the destination Message Queue for Apache Kafka instance belongs. You can find the VPC ID on the Basic Information page of the Kafka instance.
Hostname or IP address
Enter an IP address from the Default Endpoint of the Message Queue for Apache Kafka instance.
NoteYou can obtain the IP address for the Default Endpoint on the Basic Information page of the Message Queue for Apache Kafka instance.
Port Number
The service port of the Message Queue for Apache Kafka instance. The default value is 9092.
Database Account
Enter the database account for the destination Message Queue for Apache Kafka instance.
NoteIf the instance type of the Message Queue for Apache Kafka instance is VPC instance, you do not need to configure Database Account and Database Password.
Database Password
Enter the password for the specified database account.
Kafka Version
Select the version that matches your Kafka instance.
Connection method
Select Non-encrypted or SCRAM-SHA-256 based on your business and security requirements.
Topic
From the drop-down list, select the topic to receive the synchronized data.
Use Kafka Schema Registry
Kafka Schema Registry is a metadata service layer with a RESTful API for storing and retrieving Avro schemas.
-
No: Do not use Kafka Schema Registry.
-
Yes: Use Kafka Schema Registry. You need to enter the URL or IP address registered for the Avro schema in Kafka Schema Registry.
After completing the configuration, click Test Connectivity and Proceed at the bottom of the page.
NoteEnsure that you add the CIDR blocks of the DTS servers (either automatically or manually) to the security settings of both the source and destination databases to allow access. For more information, see Add the IP address whitelist of DTS servers.
If the source or destination is a self-managed database (i.e., the Access Method is not Alibaba Cloud Instance), you must also click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
-
Configure the task objects.
-
On the Configure Objects page, specify the objects to synchronize.
Parameter
Description
Synchronization Type
DTS always selects Incremental Data Synchronization. By default, you must also select Schema Synchronization and Full Data Synchronization. After the precheck, DTS initializes the destination cluster with the full data of the selected source objects, which serves as the baseline for subsequent incremental synchronization.
NoteIf the Access Method for the destination Kafka instance is Alibaba Cloud Instance, Schema Synchronization is not supported.
Processing Mode for Existing Destination Tables
Precheck and Report Errors: Checks for tables with the same names in the destination database. If any tables with the same names are found, an error is reported during the precheck and the data synchronization task does not start. Otherwise, the precheck is successful.
NoteIf you cannot delete or rename the table with the same name in the destination database, you can map it to a different name in the destination. For more information, see Database Table Column Name Mapping.
Ignore Errors and Proceed: Skips the check for tables with the same name in the destination database.
WarningSelecting Ignore Errors and Proceed may cause data inconsistency and put your business at risk. For example:
If the table schemas are consistent and a record in the destination database has the same primary key or unique key value as a record in the source database:
During full data synchronization, DTS retains the destination record and skips the source record.
During incremental synchronization, DTS overwrites the destination record with the source record.
If the table schemas are inconsistent, data initialization may fail. This can result in only partial data synchronization or a complete synchronization failure. Use with caution.
Data Format in Kafka
Select the data storage format for the Kafka instance based on your requirements.
-
If you select DTS Avro, you must parse the data based on the DTS Avro schema definition. For more information, see DTS Avro Schema Definition and DTS Avro Deserialization Example.
-
If you select Canal JSON, see Canal JSON format for parameter descriptions and examples.
-
If you select Shareplex JSON, see Shareplex JSON format for parameter descriptions and examples.
Kafka Data Compression Format
Select a compression format for Kafka messages.
-
LZ4 (Default): Offers a low compression ratio and a high compression speed.
-
GZIP: Offers a high compression ratio and a low compression speed.
NoteThis format has high CPU consumption.
-
Snappy: Offers a medium compression ratio and a medium compression speed.
Policy for Shipping Data to Kafka Partitions
Select a policy based on your business requirements.
Message acknowledgement mechanism
Select a message acknowledgment mechanism based on your business requirements.
Topic That Stores DDL Information
Select a topic to store DDL information.
NoteIf you do not select a topic, DTS stores DDL information by default in the same topic that receives data.
Case Policy for Destination Object Names
Configure the case-sensitivity policy for database, table, and column names in the destination instance. By default, the DTS default policy is selected. You can also choose to use the default policy of the source or destination database. For more information, see Case policy for destination object names.
Source Objects
In the Source Objects box, click the objects, and then click
to move them to the Selected Objects box.NoteYou can select objects to synchronize only at the table level.
Selected Objects
No additional configuration is required in this example. You can use the mapping feature to set the topic name, number of partitions, and partition key for the source table in the destination Kafka instance. For more information, see Mapping information.
Note-
To select the SQL operations to synchronize at the schema or table level, right-click an object in the Selected Objects pane and select the desired SQL operations in the dialog box that appears.
-
If you use object name mapping, the synchronization of dependent objects may fail.
-
Click Next: Advanced Settings.
Parameter
Description
Dedicated Cluster for Task Scheduling
By default, DTS uses a shared cluster for tasks, so you do not need to make a selection. For greater task stability, you can purchase a dedicated cluster to run the DTS synchronization task. For more information, see What is a DTS dedicated cluster?.
Retry Time for Failed Connections
If the connection to the source or destination database fails after the synchronization task starts, DTS reports an error and immediately begins to retry the connection. The default retry duration is 720 minutes. You can customize the retry time to a value from 10 to 1,440 minutes. We recommend a duration of 30 minutes or more. If the connection is restored within this period, the task resumes automatically. Otherwise, the task fails.
NoteIf multiple DTS instances (e.g., Instance A and B) share a source or destination, DTS uses the shortest configured retry duration (e.g., 30 minutes for A, 60 for B, so 30 minutes is used) for all instances.
DTS charges for task runtime during connection retries. Set a custom duration based on your business needs, or release the DTS instance promptly after you release the source/destination instances.
Retry Time for Other Issues
If a non-connection issue (e.g., a DDL or DML execution error) occurs, DTS reports an error and immediately retries the operation. The default retry duration is 10 minutes. You can also customize the retry time to a value from 1 to 1,440 minutes. We recommend a duration of 10 minutes or more. If the related operations succeed within the set retry time, the synchronization task automatically resumes. Otherwise, the task fails.
ImportantThe value of Retry Time for Other Issues must be less than that of Retry Time for Failed Connections.
Enable Throttling for Full Data Synchronization
During full data synchronization, DTS consumes read and write resources from the source and destination databases, which can increase their load. To mitigate pressure on the destination database, you can limit the migration rate by setting Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s).
NoteThis parameter is available only if Synchronization Types is set to Full Data Synchronization.
You can also adjust the rate of full data synchronization when the synchronization instance is running.
Enable Throttling for Incremental Data Synchronization
You can also limit the incremental synchronization rate to reduce pressure on the destination database by setting RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s).
Whether to delete SQL operations on heartbeat tables of forward and reverse tasks
Choose whether DTS writes heartbeat SQL information to the source database while the instance is running.
Yes: Does not write heartbeat SQL information to the source database. The DTS instance may display latency.
No: Writes heartbeat SQL information to the source database. This may interfere with source database operations like physical backups and cloning.
Environment Tag
You can select an environment tag to identify the instance. No selection is required for this example.
Configure ETL
Choose whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:
-
Yes: Enables the ETL feature. Enter data processing statements in the code editor. For more information, see Configure ETL in a data migration or data synchronization task.
-
No: Disables the ETL feature.
Monitoring and Alerting
Choose whether to set up alerts. If the synchronization fails or the latency exceeds the specified threshold, DTS sends a notification to the alert contacts.
No: No alerts are configured.
Yes: Configures alerts. You must also set the alert threshold and alert contact. For more information, see Configure monitoring and alerting during task configuration.
-
Save the task and perform a precheck.
To view the parameters for configuring this instance via an API operation, hover over the Next: Save Task Settings and Precheck button and click Preview OpenAPI parameters in the tooltip.
If you have finished viewing the API parameters, click Next: Save Task Settings and Precheck at the bottom of the page.
NoteBefore a synchronization task starts, DTS performs a precheck. You can start the task only if the precheck passes.
If the precheck fails, click View Details next to the failed item, fix the issue as prompted, and then rerun the precheck.
If the precheck generates warnings:
For non-ignorable warning, click View Details next to the item, fix the issue as prompted, and run the precheck again.
For ignorable warnings, you can bypass them by clicking Confirm Alert Details, then Ignore, and then OK. Finally, click Precheck Again to skip the warning and run the precheck again. Ignoring precheck warnings may lead to data inconsistencies and other business risks. Proceed with caution.
-
Purchase an instance.
When the Success Rate reaches 100%, click Next: Purchase Instance.
On the Purchase page, select the billing method and link specifications for the data synchronization instance. For more information, see the following table.
Category
Parameter
Description
New Instance Class
Billing Method
Subscription: You pay upfront for a specific duration. This is cost-effective for long-term, continuous tasks.
Pay-as-you-go: You are billed hourly for actual usage. This is ideal for short-term or test tasks, as you can release the instance at any time to save costs.
Resource Group Settings
The resource group to which the instance belongs. The default is default resource group. For more information, see What is Resource Management?.
Instance Class
DTS offers synchronization specifications at different performance levels that affect the synchronization rate. Select a specification based on your business requirements. For more information, see Data synchronization link specifications.
Subscription Duration
In subscription mode, select the duration and quantity of the instance. Monthly options range from 1 to 9 months. Yearly options include 1, 2, 3, or 5 years.
NoteThis option appears only when the billing method is Subscription.
Read and select the checkbox for Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start, and then click OK in the OK dialog box.
You can monitor the task progress on the data synchronization page.
Mapping information
-
In the Selected Objects section, hover over the target topic name (at the table level).
-
Click Edit next to the topic name.
-
In the Edit Table dialog box, configure the mapping information.
Note-
At the schema level, the Edit Schema dialog box appears and offers fewer parameters. At the table level, the Edit Table dialog box appears.
-
If you are not synchronizing an entire database, you cannot modify Name of target Topic or Number of Partitions in the Edit Schema dialog box.
Parameter
Description
Name of target Topic
The name of the destination topic for the source table. By default, DTS uses the Topic selected for the Destination Database during the Configurations for Source and Destination Databases step.
Important-
If the destination is a Message Queue for Apache Kafka instance, the topic name that you specify must already exist in the destination Kafka instance. Otherwise, data synchronization fails. If the destination is a self-managed Kafka cluster and the data synchronization instance includes schema synchronization, DTS attempts to create the topic that you specify in the destination cluster.
-
If you change the Name of target Topic, DTS writes the data to the topic that you specify.
Filter Conditions
For more information, see Set filter conditions.
Number of Partitions
The number of partitions in the destination topic.
Partition Key
If you set the Policy for Shipping Data to Kafka Partitions to Ship Data to Separate Partitions Based on Hash Values of Primary Keys, you can specify one or more columns as the partition key. DTS calculates a hash value based on the specified key and uses it to distribute rows among the partitions of the destination topic.
NoteYou can configure Partition Key only in the Edit Table dialog box.
-
-
Click OK.
FAQ
-
Can I modify the Kafka Data Compression Format?
Yes. For details, see Modify the objects to be synchronized.
-
Can I modify the Message acknowledgement mechanism?
Yes. For details, see Modify the objects to be synchronized.