Migrate data from an ApsaraDB for MongoDB instance to an ApsaraMQ for Kafka instance

更新时间:
复制 MD 格式

Data Transmission Service (DTS) lets you stream MongoDB data into a Kafka topic in Canal JSON format — including all existing documents and ongoing changes. This topic walks through setting up a migration task from an ApsaraDB for MongoDB replica set instance to an ApsaraMQ for Kafka instance.

What DTS supports

Capability Details
Full data migration Copies all existing documents from selected collections to the destination Kafka topic
Incremental data migration Captures ongoing changes (inserts, updates, deletes, and DDL events) after full migration completes
Incremental methods Oplog (recommended — lower latency) or ChangeStream (delivers full document on update)
Data format Canal JSON — the only supported format for Kafka destinations
Supported source types ApsaraDB for MongoDB replica set and sharded cluster instances, Azure Cosmos DB for MongoDB clusters, Amazon DocumentDB elastic clusters (standalone instances: full data migration only)

Billing

Migration type Task configuration fee Data transfer fee
Full data migration Free Free. Internet traffic charges apply if Access Method for the destination is set to Public IP Address. See Billable items
Incremental data migration Charged. See Billing overview

Prerequisites

Before you begin, make sure you have:

  • A destination ApsaraMQ for Kafka instance and a topic created in it to receive data

  • A source ApsaraDB for MongoDB instance with the required account permissions (see Required permissions)

  • (Sharded cluster only) Endpoints applied for all shard nodes, with a shared account password across all nodes. See Apply for an endpoint for a shard

For supported source and destination database versions, see Overview of data migration scenarios.

Required permissions

Database Full data migration Incremental data migration
Source ApsaraDB for MongoDB Read on the source database and the config database Read on the source database, the admin database, and the local database

For account management, see Account management.

Limitations

Source database

  • The server must have enough outbound bandwidth. Low bandwidth reduces migration speed.

  • Migrate up to 1,000 collections per task. Exceeding this limit causes a request error — split into multiple tasks instead.

  • DTS cannot connect to MongoDB over an SRV endpoint.

  • DTS cannot migrate data from the admin, config, or local databases.

  • For incremental data migration, one of the following requirements must be met: If DTS cannot access logs within that window, the task may fail and data inconsistency or loss may occur. This is not covered by the DTS service level agreement (SLA).

    • The oplog feature is enabled for the source database and operation logs are retained for at least 7 days.

    • Change streams are enabled for the source database and DTS can use change streams to subscribe to data changes within the last 7 days.

  • For an inelastic Amazon DocumentDB cluster: enable change streams, and set Migration Method to ChangeStream and Architecture to Sharded Cluster.

  • If the source is a sharded cluster:

    • The _id field in all collections must be unique; otherwise, data inconsistency may occur.

    • The number of mongos nodes cannot exceed 10.

    • The source must not contain orphaned documents. See the MongoDB documentation and the DTS FAQ.

    • During full data migration, disable the balancer until each subtask reaches the incremental migration phase.

    • If Migration Method is Oplog, DTS cannot guarantee write order across different shards.

    • If the balancer has balancing data behavior, it may cause latency.

  • During full data migration: do not make schema changes (including array type updates) and do not write to the source database. Both can cause data inconsistency.

Other limitations

  • Only collections can be selected as migration objects — not databases.

  • Single document size limit: 10 MB. Tasks fail if exceeded.

  • Transactions are not retained; each transaction is converted to a single record in the destination.

  • If broker nodes are added or removed in the destination Kafka instance during a running DTS task, restart the task.

  • DTS must be able to connect to both the source and destination. Check that security settings, and the listeners and advertised.listeners parameters in the Kafka server.properties file, do not block DTS.

  • Full data migration uses read and write resources on both databases. Run migration during off-peak hours (for example, when CPU utilization is below 30%).

  • DTS attempts to recover failed instances for up to 7 days. Stop or release the task before switching workloads to the destination — otherwise the destination data may be overwritten if the task resumes automatically.

  • Incremental migration latency is calculated based on the latest migrated data timestamp vs. the current source timestamp. If the source has no updates for a long period, the latency display may be inaccurate. Trigger an update operation on the source to refresh it.

  • If a DTS task fails, DTS technical support tries to restore it within 8 hours. The task may be restarted and task parameters may be modified during restoration. Only task parameters are changed — database parameters are not.

What gets migrated

Full data migration

DTS migrates existing data from all selected collections.

Incremental data migration

Use Oplog

Oplog captures changes from these operations:

  • CREATE COLLECTION, INDEX

  • DROP DATABASE, COLLECTION, INDEX

  • RENAME COLLECTION

  • Insert, update, and delete on documents

A DTS task does not migrate incremental data from databases created after the task starts.

Use ChangeStream

ChangeStream captures changes from these operations:

  • DROP DATABASE, COLLECTION

  • RENAME COLLECTION

  • Insert, update, and delete on documents

Set up the migration task

Step 1: Go to the Data Migration page

Use one of the following methods.

DTS console

From the DTS console:

  1. Log on to the DTS console.

  2. In the left-side navigation pane, click Data Migration.

  3. In the upper-left corner, select the region where the migration instance resides.

DMS console

From the DMS console:

Steps may vary based on the DMS console mode and layout. See Simple mode and Customize the layout and style of the DMS console.
  1. Log on to the DMS console.

  2. In the top navigation bar, move the pointer over Data + AI > DTS (DTS) > Data Migration.

  3. From the drop-down list next to Data Migration Tasks, select the region.

Step 2: Configure source and destination databases

Click Create Task, then configure the following parameters.

Source database

Parameter Description
Task Name A descriptive name for the DTS task. DTS auto-generates a name. Unique names are not required.
Select Existing Connection Select a registered instance to auto-fill parameters, or configure the following fields manually.
Database Type Select MongoDB.
Access Method Select Alibaba Cloud Instance.
Instance Region The region of the source ApsaraDB for MongoDB instance.
Replicate Data Across Alibaba Cloud Accounts Select No to use an instance in the current account.
Architecture Select Replica Set for this example. If the source is a sharded cluster, configure the Shard account and Shard password parameters — unless Migration Method is set to ChangeStream, in which case the shard account and password are not required.
Migration Method How incremental data is captured. Options: Oplog (recommended) or ChangeStream. See Choose a migration method.
Instance ID The ID of the source ApsaraDB for MongoDB instance.
Authentication Database The database that stores account credentials. Default: admin.
Database Account The account for the source instance. See Required permissions.
Database Password The password for the database account.
Encryption Non-encrypted, SSL-encrypted, or Mongo Atlas SSL. Available options depend on the Access Method and Architecture values. The options displayed in the DTS console prevail. If Architecture is Sharded Cluster and Migration Method is Oplog, SSL-encrypted is unavailable. If the source is a self-managed replica set (not Alibaba Cloud Instance) and SSL-encrypted is selected, upload a CA certificate to verify the connection.

Destination database

Parameter Description
Select Existing Connection Select a registered instance to auto-fill parameters, or configure the following fields manually.
Database Type Select Kafka.
Access Method Select Alibaba Cloud Instance.
Instance Region The region of the destination Kafka instance.
Kafka Instance ID The ID of the destination Kafka instance.
Encryption Non-encrypted or SCRAM-SHA-256.
Topic The topic that receives the migrated data. The topic must already exist in the Kafka instance.
Topic That Stores DDL Information The topic for DDL events. If left blank, DDL information goes to the topic set in Topic.
Use Kafka Schema Registry No (default) or Yes. If Yes, enter the URL or IP address registered in Kafka Schema Registry for your Avro schemas.

Step 3: Test connectivity

Click Test Connectivity and Proceed.

DTS server CIDR blocks must be added (automatically or manually) to the security settings of your databases. See Add the CIDR blocks of DTS servers.
For self-managed databases not using Alibaba Cloud Instance as the access method, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.

Step 4: Configure objects to migrate

On the Configure Objects page, set the following parameters.

Parameter Description
Migration Types Select Full Data Migration only, or both Full Data Migration and Incremental Data Migration for continuous sync.
Processing Mode of Conflicting Tables Precheck and Report Errors: fails the precheck if identically named collections exist in the destination. Use object name mapping to rename collections if needed. Ignore Errors and Proceed: skips the precheck. DTS does not migrate records whose primary keys match existing records in the destination.
Warning

selecting this option risks data inconsistency.

Data Format in Kafka Canal JSON only.
Kafka Data Compression Format LZ4 (default — low compression ratio, high speed), GZIP (high compression ratio, low speed — high CPU usage), or Snappy (balanced).
Policy for Shipping Data to Kafka Partitions See Specify the policy for migrating data to Kafka partitions.
Message acknowledgement mechanism See Message acknowledgement mechanism.
Capitalization of Object Names in Destination Instance Default: DTS default policy. See Specify the capitalization of object names in the destination instance.
Source Objects Select collections from the Source Objects section, then click 向右小箭头 to add them to Selected Objects.

Click Next: Advanced Settings.

Step 5: Configure advanced settings

Parameter Description
Dedicated Cluster for Task Scheduling By default, DTS uses the shared cluster. For higher stability, use a dedicated cluster. See What is a DTS dedicated cluster.
Retry Time for Failed Connections How long DTS retries failed connections. Range: 10–1,440 minutes. Default: 720. Set to at least 30 minutes. If DTS reconnects within this window, the task resumes; otherwise it fails. Charges apply during retries.
Retry Time for Other Issues How long DTS retries failed DML or DDL operations. Range: 1–1,440 minutes. Default: 10. Must be less than Retry Time for Failed Connections.
Obtain the entire document after it is updated Available only when Migration Method is ChangeStream and Incremental Data Migration is selected. Yes: sends the full document after each update. No: sends only the changed fields.
Enable Throttling for Full Data Migration Throttle the migration rate by configuring QPS to the source database, RPS of Full Data Migration, and Data migration speed (MB/s). Available only when Full Data Migration is selected.
Only one data type for primary key _id in a table of the data to be synchronized Whether the _id field data type is unique within each collection. Yes: DTS skips the data type scan during full migration. No: DTS scans data types. Available only when Full Data Migration is selected.
Enable Throttling for Incremental Data Migration Configure RPS of Incremental Data Migration and Data migration speed (MB/s). Available only when Incremental Data Migration is selected.
Environment Tag An optional tag to identify the DTS instance.
Configure ETL Yes: apply extract, transform, and load (ETL) processing using data processing statements. See Configure ETL in a data migration or data synchronization task. No: skip ETL.
Monitoring and Alerting Yes: configure alert thresholds and notification settings for task failures or high latency. See Configure monitoring and alerting. No: no alerting.

Step 6: Save and run the precheck

Click Next: Save Task Settings and Precheck.

To preview OpenAPI parameters before saving, move the pointer over the button and click Preview OpenAPI parameters.

DTS runs a precheck before starting the migration. The task starts only after passing.
If any item fails, click View Details next to it, fix the issue, and click Precheck Again.
If an alert item can be safely ignored, click Confirm Alert Details > Ignore > OK, then click Precheck Again. Ignoring alerts may cause data inconsistency.

Step 7: Purchase and start the instance

  1. Wait until Success Rate reaches 100%, then click Next: Purchase Instance.

  2. On the Purchase Instance page, configure the following.

Parameter Description
Resource Group The resource group for the migration instance. Default: default resource group. See What is Resource Management?
Instance Class Select based on your migration speed requirements. See Instance classes of data migration instances.
  1. Read and agree to Data Transmission Service (Pay-as-you-go) Service Terms.

  2. Click Buy and Start, then click OK.

View the task progress on the Data Migration page.

Full-only migration tasks stop automatically when complete (Completed status).
Tasks with incremental migration run continuously (Running status) and do not stop automatically.

Map collections to Kafka topics

By default, all collections are written to the topic specified by the Topic parameter. To send specific collections to different topics or configure partitions and filters:

  1. In the Selected Objects section, move the pointer over a collection name and click Edit.

  2. In the Edit Table dialog box, configure the following.

Parameter Description
Table Name The destination topic for this collection. The topic must already exist in the Kafka instance. If changed, data is written to the specified topic instead of the default.
Filter Conditions SQL-based filter to include only matching documents. See Specify filter conditions.
Number of Partitions The number of partitions used when writing to the destination topic.
  1. Click OK.

Choose a migration method

DTS supports three incremental migration scenarios. All examples use Canal JSON format for the data delivered to the destination topic.

Oplog (recommended)

Main instance configuration

Settings: Migration Method = Oplog, Migration Types = Incremental Data Migration

Oplog is enabled by default on both self-managed MongoDB databases and ApsaraDB for MongoDB instances. It offers lower latency due to fast log pulling.

Important

Use Oplog unless your source requires change streams (for example, inelastic Amazon DocumentDB clusters).

Data delivery example

Data delivery examples

Change type Source statement Data received by the destination topic
insert db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

View details (show more)

{"data":[{"person":{"skills":["database","ai"],"name":"testName","age":18},"_id":{"$oid":"67d27da49591697476e1**"},"cid":"a"}],"database":"kafkadb","es":1741847972000,"gtid":null,"id":174184797200000**,"isDdl":false,"mysqlType":null,"old":null,"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741847973438,"type":"INSERT"}
update $set db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

View details (show more)

{"data":[{"$set":{"person.age":20}}],"database":"kafkadb","es":1741848051000,"gtid":null,"id":174184805100000**,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848051984,"type":"UPDATE"}
update $set new field db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

View details (show more)

{"data":[{"$set":{"salary":100.0}}],"database":"kafkadb","es":1741848146000,"gtid":null,"id":174184814600000**,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848147734,"type":"UPDATE"}
update $unset db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

View details (show more)

{"data":[{"$unset":{"salary":true}}],"database":"kafkadb","es":1741848207000,"gtid":null,"id":174184820700000**,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848208186,"type":"UPDATE"}
delete db.kafka_test.deleteOne({"cid":"a"})

View details (show more)

{"data":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"database":"kafkadb","es":1741848289000,"gtid":null,"id":174184828900000**,"isDdl":false,"mysqlType":null,"old":null,"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848289798,"type":"DELETE"}
ddl drop db.kafka_test.drop()

View data (click to expand)

{"data":null,"database":"kafkadb","es":1741847893000,"gtid":null,"id":1741847893000000005,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"serverId":null,"sql":{"drop":"kafka_test"},"sqlType":null,"table":"kafka_test","ts":1741847893760,"type":"DDL"}

ChangeStream — updated fields only

Main instance configuration

Settings: Migration Method = ChangeStream, Migration Types = Incremental Data Migration, Obtain the entire document after it is updated = No

Important

Change streams require MongoDB V4.0 or later.

Each update event delivers only the changed fields.

Data delivery example

Data delivery examples

Change type Source statement Data received by the destination topic
insert db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

View details (show more)

{"data":[{"person":{"skills":["database","ai"],"name":"testName","age":18},"_id":{"$oid":"67d27da49591697476e1**"},"cid":"a"}],"database":"kafkadb","es":1741847972000,"gtid":null,"id":174184797200000**,"isDdl":false,"mysqlType":null,"old":null,"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741847973803,"type":"INSERT"}
update $set db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

View details (show more)

{"data":[{"$set":{"person.age":20}}],"database":"kafkadb","es":1741848051000,"gtid":null,"id":174184805100000**,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848052912,"type":"UPDATE"}
update $set new field db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

View data (click to expand)

{"data":[{"$set":{"salary":100.0}}],"database":"kafkadb","es":1741848146000,"gtid":null,"id":174184814600000**,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848148056,"type":"UPDATE"}
update $unset db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

View details (show more)

{"data":[{"$unset":{"salary":1}}],"database":"kafkadb","es":1741848207000,"gtid":null,"id":174184820700000**,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848209142,"type":"UPDATE"}
delete db.kafka_test.deleteOne({"cid":"a"})

View details (show more)

{"data":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"database":"kafkadb","es":1741848289000,"gtid":null,"id":174184828900000**,"isDdl":false,"mysqlType":null,"old":null,"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848290254,"type":"DELETE"}
ddl drop db.kafka_test.drop()

View details (show more)

{"data":null,"database":"kafkadb","es":1741847893000,"gtid":null,"id":174184789300000****,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"serverId":null,"sql":{"drop":"kafka_test"},"sqlType":null,"table":"kafka_test","ts":1741847894679,"type":"DDL"}

ChangeStream — full document on update

Main instance configuration

Settings: Migration Method = ChangeStream, Migration Types = Incremental Data Migration, Obtain the entire document after it is updated = Yes

Important

Change streams require MongoDB V4.0 or later.

Each update event delivers the complete document state after the change.

Data delivery example

Data delivery examples

Change type Source statement Data received by the destination topic
insert db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

View details (show more)

{"data":[{"person":{"skills":["database","ai"],"name":"testName","age":18},"_id":{"$oid":"67d27da49591697476e1**"},"cid":"a"}],"database":"kafkadb","es":1741847972000,"gtid":null,"id":174184797200000**,"isDdl":false,"mysqlType":null,"old":null,"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741847973128,"type":"INSERT"}
update $set db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

View details (show more)

{"data":[{"person":{"skills":["database","ai"],"name":"testName","age":20},"_id":{"$oid":"67d27da49591697476e1**"},"cid":"a"}],"database":"kafkadb","es":1741848051000,"gtid":null,"id":174184805100000,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848052219,"type":"UPDATE"}
update $set new field db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

View details (show more)

{"data":[{"person":{"skills":["database","ai"],"name":"testName","age":20},"_id":{"$oid":"67d27da49591697476e1**"},"salary":100.0,"cid":"a"}],"database":"kafkadb","es":1741848146000,"gtid":null,"id":174184814600000,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848147327,"type":"UPDATE"}
update $unset db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

View details (show more)

{"data":[{"person":{"skills":["database","ai"],"name":"testName","age":20},"_id":{"$oid":"67d27da49591697476e1**"},"cid":"a"}],"database":"kafkadb","es":1741848207000,"gtid":null,"id":174184820700000,"isDdl":false,"mysqlType":null,"old":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848208401,"type":"UPDATE"}
delete db.kafka_test.deleteOne({"cid":"a"})

View details (show more)

{"data":[{"_id":{"$oid":"67d27da49591697476e1**"}}],"database":"kafkadb","es":1741848289000,"gtid":null,"id":174184828900000**,"isDdl":false,"mysqlType":null,"old":null,"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"kafka_test","ts":1741848290499,"type":"DELETE"}
ddl drop db.kafka_test.drop()

View details (show more)

{"data":null,"database":"kafkadb","es":1741847893000,"gtid":null,"id":174184789300000****,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"serverId":null,"sql":{"drop":"kafka_test"},"sqlType":null,"table":"kafka_test","ts":1741847894045,"type":"DDL"}

Special case: missing fullDocument

When Obtain the entire document after it is updated is Yes, if the fullDocument field is missing from an update event (for example, in a sharded collection), the delivery result is the same as the Oplog scenario — only the changed fields are sent.

Example

Base data Change statement Data received
use admin db.runCommand({ enablesharding:"dts_test" }) use dts_test sh.shardCollection("dts_test.cstest",{"name":"hashed"}) db.cstest.insert({"_id":1,"name":"a"}) db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

View details (show more)

{"data":[{"$set":{"name":"b"}}],"database":"dts_test","es":1740720994000,"gtid":null,"id":174072099400000****,"isDdl":false,"mysqlType":null,"old":[{"name":"a","_id":1.0}],"pkNames":["_id"],"serverId":null,"sql":null,"sqlType":null,"table":"cstest","ts":1740721007099,"type":"UPDATE"}

What's next