云数据库MongoDB版迁移至阿里云消息队列Kafka版

更新时间:2025-04-17 10:35:27
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

数据传输服务DTS(Data Transmission Service)支持将MongoDB的数据迁移到Kafka集群中。本文以云数据库MongoDB(副本集架构)实例为源库,且以阿里云消息队列Kafka实例为目标库,为您介绍迁移操作的步骤。

前提条件

  • 已创建目标阿里云消息队列Kafka实例

    说明

    源库和目标库支持的版本,请参见迁移方案概览

  • 已在目标阿里云消息队列Kafka实例中创建用于接收数据的Topic

  • 若源库为分片集群架构的云数据库MongoDB,则还需为所有Shard节点申请连接地址,且各Shard的账号和密码需保持一致。申请方法,请参见申请ShardConfigServer节点连接地址

注意事项

类型

说明

类型

说明

源库限制

  • 带宽要求:源库所属的服务器需具备足够的出口带宽,否则将影响数据迁移速率。

  • 若需进行编辑(如集合的名称映射),则单次迁移任务仅支持迁移至多1000个集合。当超出数量限制,任务提交后会显示请求报错,此时建议您拆分待迁移的集合,分批配置多个任务。

  • 源库为单节点架构的MongoDB、Azure Cosmos DB for MongoDB或弹性集群的Amazon DocumentDB时,仅支持全量迁移。

  • 如需进行增量迁移:

    源库需开启Oplog日志,并确保Oplog日志至少保留7天以上;或者开启变更流(Change Streams),并确保DTS能够通过Change Streams订阅到源库最近7天内的数据变更。否则可能会因无法获取源库的数据变更而导致任务失败,极端情况下甚至可能会导致数据不一致或丢失。由此导致的问题,不在DTSSLA保障范围内。

    重要
    • 建议通过Oplog日志获取源库的数据变更。

    • 4.0及以上版本的MongoDB支持通过Change Streams获取数据变更。

    • 源库为Amazon DocumentDB(非弹性集群)时,需要手动开启Change Streams,并在配置任务时将迁移方式选择为ChangeStream,将架构类型选择为分片集群架构

  • 若源MongoDB为分片集群架构的实例,则待迁移集合中的_id字段需具有唯一性,否则可能会导致数据不一致。

  • 不支持通过SRV地址连接MongoDB数据库。

  • 若源MongoDB为分片集群架构的实例,则源Mongos节点的数量不能超过10个。同时,请确保分片集群架构的MongoDB实例中没有孤立文档,否则可能会导致数据不一致甚至任务失败。更多信息,请参见孤立文档如何清理MongoDB(分片集群架构)的孤立文档

  • 源库的操作限制:

    • 在全量迁移阶段,请勿对库或集合执行结构变更操作(包含数组类型数据的更新),否则会导致数据迁移任务失败或源库与目标库的数据不一致。

    • 若仅执行全量数据迁移,请勿向源实例中写入新的数据,否则会导致源库与目标库的数据不一致。

    • 若源MongoDB为分片集群实例,在迁移实例运行期间,请勿在源库对待迁移的对象执行更改数据分布的相关命令(例如shardCollection、reshardCollection、unshardCollection、moveCollection、movePrimary等),否则可能会导致数据不一致。

  • 当源库为分片集群架构的MongoDB时,若源库的均衡器Balancer存在均衡数据的行为,则可能会导致实例产生延迟。

其他限制

  • 仅支持集合级别的迁移。

  • 不支持迁移adminlocal库中的数据。

  • 若待迁移的单条数据超过10 MB,会导致任务失败。

  • 若源库为分片集群架构的MongoDB:

    • 全量迁移期间必须关闭源MongoDB数据库的均衡器(Balancer),直至每个子任务都运行到增量阶段,否则可能会造成数据不一致。关于均衡器的操作,请参见管理MongoDB均衡器Balancer

    • 若增量数据迁移的方式为Oplog,则DTS无法保证源库不同分片数据在目标Kafka的写入顺序。

  • 不保留事务信息,即源库中的事务迁移到目标库时会转变为单条的记录。

  • DTS实例运行期间,若目标Kafka发生了扩容或缩容(如增加或减少KafkaBroker节点),需要重启DTS实例。

  • 阿里云消息队列Kafka实例的Broker数量大于3个,需以专线/VPN网关/智能网关的方式接入DTS。

  • 请确保DTS能够正常连接源端和目标端。例如,数据库实例的安全设置、自建Kafka配置文件server.properties中的listenersadvertised.listeners参数,均未对DTS的访问进行限制。

  • 在全量任务运行时,DTS将会占用源和目标库一定的读写资源,可能会导致数据库的负载上升。因此建议您在DTS实例运行前评估源库和目标库的性能,并在业务低峰期运行实例(例如源库和目标库的CPU负载在30%以下)。

  • DTS将尝试恢复在七天内运行失败的实例。因此,在业务切换至目标实例之前,请务必结束或释放该实例,以避免该实例被自动恢复后导致目标数据库的数据被覆盖。

  • 由于DTS增量迁移的延迟时间是根据迁移到目标库最后一条数据的时间戳和当前时间戳对比得出,源库长时间未执行更新操作可能导致延迟信息不准确。如果任务显示的延迟时间过大,您可以在源库执行一个更新操作来更新延迟信息。

  • 若实例运行失败,DTS技术支持人员将在8小时内尝试恢复该实例。在恢复失败实例的过程中,可能会对该实例进行重启、调整参数等操作。

    说明

    在调整参数时,仅会修改实例的参数,不会对数据库中的参数进行修改。可能修改的参数,包括但不限于修改实例参数中的参数。

费用说明

迁移类型

链路配置费用

公网流量费用

迁移类型

链路配置费用

公网流量费用

全量数据迁移

不收费。

本示例不收费。当目标库的接入方式公网IP时,收取公网流量费用

增量数据迁移

收费,详情请参见计费概述

迁移类型说明

迁移类型

说明

迁移类型

说明

全量迁移

将源云数据库MongoDB迁移对象的存量数据全部迁移到目标Kafka实例中。

说明

支持全量迁移DATABASECOLLECTION中的数据。

增量迁移

在全量迁移的基础上,将源云数据库MongoDB的增量更新迁移到目标Kafka实例中。

使用Oplog
使用ChangeStream

增量迁移不支持在任务开始运行后新建的数据库,支持迁移的增量更新如下:

  • CREATE COLLECTION、INDEX

  • DROP DATABASE、COLLECTION、INDEX

  • RENAME COLLECTION

  • 在集合中插入、更新、删除文档的操作。

支持迁移的增量更新如下:

  • DROP DATABASE、COLLECTION

  • RENAME COLLECTION

  • 在集合中插入、更新、删除文档的操作。

数据库账号的权限要求

数据库

全量迁移

增量迁移

账号创建及授权方法

数据库

全量迁移

增量迁移

账号创建及授权方法

云数据库MongoDB

待迁移库和config库的read权限。

待迁移库、admin库和local库的read权限。

账号管理

操作步骤

  1. 进入目标地域的迁移任务列表页面(二选一)。

    通过DTS控制台进入
    通过DMS控制台进入
    1. 登录数据传输服务DTS控制台

    2. 在左侧导航栏,单击数据迁移

    3. 在页面左上角,选择迁移实例所属地域。

    说明

    实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息。请参见极简模式控制台自定义DMS界面布局与样式

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,选择Data + AI > 数据传输(DTS) > 数据迁移

    3. 迁移任务右侧,选择迁移实例所属地域。

  2. 单击创建任务,进入任务配置页面。

  3. 配置源库及目标库信息。

    类别

    配置

    说明

    类别

    配置

    说明

    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息

    选择已有连接信息

    • 若您需要使用已录入系统(新建或保存)的数据库实例,请在下拉列表中选择所需的数据库实例,下方的数据库信息将自动进行配置。

      说明

      DMS控制台的配置项为选择DMS数据库实例

    • 若您未将数据库实例录入到系统,或无需使用已录入系统的数据库实例,则需要手动配置下方的数据库信息。

    数据库类型

    选择MongoDB

    接入方式

    选择云实例

    实例地区

    选择源云数据库MongoDB所属的地域。

    是否跨阿里云账号

    本示例使用当前阿里云账号下的数据库实例,需选择不跨账号

    架构类型

    本示例选择副本集架构

    说明

    若您的源云数据库MongoDB分片集群架构,您还需要填写Shard账号Shard密码

    迁移方式

    请根据实际情况,选择增量数据迁移的方式。

    • Oplog(推荐):

      若源库已开启Oplog日志,则支持此选项。

      说明

      本地自建MongoDB云数据库MongoDB默认已开启Oplog日志,且使用此方式迁移增量数据时增量迁移任务的延迟较小(拉取日志的速度较快),因此推荐选择Oplog

    • ChangeStream:若源库已开启变更流(Change Streams),则支持此选项。

      说明
      • 源库为Amazon DocumentDB(非弹性集群)时,仅支持选择ChangeStream

      • 源库架构类型选择为分片集群架构,无需填写Shard账号Shard密码

    实例ID

    选择源云数据库MongoDB实例的ID。

    鉴权数据库名称

    填入源云数据库MongoDB实例中数据库账号所属的数据库名称,若未修改过则为默认的admin

    数据库账号

    填入源云数据库MongoDB的数据库账号。权限要求,请参见数据库账号的权限要求

    数据库密码

    填入该数据库账号对应的密码。

    连接方式

    DTS支持非加密连接SSL安全连接Mongo Atlas SSL三种连接方式。连接方式的选项与接入方式架构类型有关,请以控制台为准。

    说明
    • 架构类型分片集群架构,且迁移方式OplogMongoDB数据库,不支持SSL安全连接

    • 若源库为自建(接入方式不为云实例副本集架构MongoDB数据库,并且选择了SSL安全连接,DTS还支持上传CA证书对连接进行校验。

    目标库信息

    选择已有连接信息

    • 若您需要使用已录入系统(新建或保存)的数据库实例,请在下拉列表中选择所需的数据库实例,下方的数据库信息将自动进行配置。

      说明

      DMS控制台的配置项为选择DMS数据库实例

    • 若您未将数据库实例录入到系统,或无需使用已录入系统的数据库实例,则需要手动配置下方的数据库信息。

    数据库类型

    选择Kafka

    接入方式

    选择云实例

    实例地区

    选择目标Kafka实例所属的地域。

    Kafka实例ID

    选择目标Kafka实例的ID。

    连接方式

    请根据业务及安全需求,选择非加密连接SCRAM-SHA-256

    Topic

    在下拉框中选择用于接收数据的Topic。

    存储DDLTopic

    在下拉框中选择用于存储DDL信息的Topic。

    说明

    若未选择,DDL信息默认存储在Topic选择的Topic中。

    是否使用Kafka Schema Registry

    Kafka Schema Registry是元数据提供服务层,提供了一个RESTful接口,用于存储和检索Avro Schema。

    • :不使用Kafka Schema Registry。

    • :使用Kafka Schema Registry。您需要输入Avro SchemaKafka Schema Registry注册的URLIP。

  4. 配置完成后,在页面下方单击测试连接以进行下一步

    说明
    • 请确保DTS服务的IP地址段能够被自动或手动添加至源库和目标库的安全设置中,以允许DTS服务器的访问。更多信息,请参见添加DTS服务器的IP地址段

    • 若源库或目标库为自建数据库(接入方式不是云实例),则还需要在弹出的DTS服务器访问授权对话框单击测试连接

  5. 配置任务对象。

    1. 对象配置页面,配置待迁移的对象。

      配置

      说明

      配置

      说明

      迁移类型

      • 如果只需要进行全量迁移,请选中全量迁移

      • 如果需要进行不停机迁移,请同时选中全量迁移增量迁移

      说明

      若未选中增量迁移,为保障数据一致性,数据迁移期间请勿在源实例中写入新的数据。

      目标已存在表的处理模式

      • 预检查并报错拦截:检查目标数据库中是否有同名的集合。如果目标数据库中没有同名的集合,则通过该检查项目;如果目标数据库中有同名的集合,则在预检查阶段提示错误,数据迁移任务不会被启动。

        说明

        如果目标库中同名的集合不方便删除或重命名,您可以更改该集合在目标库中的名称,请参见库表列名映射

      • 忽略报错并继续执行:跳过目标数据库中是否有同名集合的检查项。

        警告

        选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

        • 在目标库遇到与源库主键的值相同的记录,则会保留目标库中的该条记录,即源库中的该条记录不会迁移至目标库中。

        • 可能会导致无法初始化数据、只能迁移部分的数据或迁移失败。

      投递到Kafka的数据格式

      仅支持选择Canal JSON

      说明

      Kafka接收到的数据可以分为三种场景

      Kafka压缩格式

      根据需求选择Kafka压缩消息的压缩格式。

      • LZ4(默认):压缩率较低,压缩速率较高。

      • GZIP:压缩率较高,压缩速率较低。

        说明

        CPU的消耗较高。

      • Snappy:压缩率中等,压缩速率中等。

      投递到Kafka Partition策略

      根据业务需求选择策略

      消息确认机制

      根据业务需求选择消息确认机制

      目标库对象名称大小写策略

      您可以配置目标实例中迁移对象的库名、集合名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略

      源库对象

      源库对象框中单击待迁移的对象,然后单击向右小箭头将其移动到已选择对象框。

      说明

      迁移对象选择的粒度为集合。

      已选择对象

      本示例无需额外配置。

      您可以使用映射功能,设置源库中的集合在目标Kafka实例中的映射信息

    2. 单击下一步高级配置,进行高级参数配置。

      配置

      说明

      配置

      说明

      选择调度该任务的专属集群

      DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS迁移任务。更多信息,请参见什么是DTS专属集群

      源库、目标库无法连接后的重试时间

      在迁移任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认重试720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的时间内重新连接上源、目标库,迁移任务将自动恢复。否则,迁移任务将失败。

      说明
      • 针对同源或者同目标的多个DTS实例,网络重试时间以后创建任务的设置为准。

      • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

      源库、目标库出现其他问题后的重试时间

      在迁移任务启动后,若源库或目标库出现非连接性的其他问题(如DDLDML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,迁移任务将自动恢复。否则,迁移任务将会失败。

      重要

      源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。

      是否获取更新操作后的完整文档

      增量数据迁移阶段,是否将更新操作后对应文档(Document)的完整数据迁移到目标端。

      说明

      仅当迁移方式选择ChangeStream,且迁移类型选择了增量迁移时,才有此配置项。

      • :迁移更新字段对应文档的完整数据。

      • :只迁移更新字段的数据。

      是否限制全量迁移速率

      在全量迁移阶段,DTS将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。您可以根据实际情况,选择是否对全量迁移任务进行限速设置(设置每秒查询源库的速率QPS每秒全量迁移的行数RPS每秒全量迁移的数据量(MB)BPS),以缓解目标库的压力。

      说明

      仅当迁移类型选择了全量迁移,才有此配置项。

      待同步的数据中,同一张表内主键_id的数据类型是否唯一

      待迁移的数据中,同一个集合内主键_id的数据类型是否唯一。

      重要
      • 请根据实际情况选择,否则可能会导致数据丢失。

      • 仅当迁移类型选择了全量迁移,才有此配置项。

      • :唯一。在全量迁移阶段,DTS将不会扫描源库待迁移数据中主键的数据类型。

      • :不唯一。在全量迁移阶段,DTS将扫描源库待迁移数据中主键的数据类型。

      是否限制增量迁移速率

      您也可以根据实际情况,选择是否对增量迁移任务进行限速设置(设置每秒增量迁移的行数RPS每秒增量迁移的数据量(MB)BPS),以缓解目标库的压力。

      说明

      仅当迁移类型选择了增量迁移,才有此配置项。

      环境标签

      您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。

      配置ETL功能

      选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL

      监控告警

      是否设置告警,当迁移失败或延迟超过阈值后,将通知告警联系人。

  6. 保存任务并进行预检查。

    • 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数

    • 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查

    说明
    • 在迁移任务正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动迁移任务。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  7. 购买实例。

    1. 预检查通过率显示为100%时,单击下一步购买

    2. 购买页面,选择数据迁移实例的链路规格,详细说明请参见下表。

      类别

      参数

      说明

      类别

      参数

      说明

      信息配置

      资源组配置

      选择实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

      链路规格

      DTS为您提供了不同性能的迁移规格,迁移链路规格的不同会影响迁移速率,您可以根据业务场景进行选择。更多信息,请参见数据迁移链路规格说明

    3. 配置完成后,阅读并选中《数据传输(按量付费)服务条款》

    4. 单击购买并启动,并在弹出的确认对话框,单击确定

      您可以在迁移任务列表页面,查看迁移实例的具体进度。

      说明
      • 若迁移实例不包含增量迁移任务,则迁移实例会自动结束。迁移实例自动结束后,运行状态已完成

      • 若迁移实例包含增量迁移任务,则迁移实例不会自动结束,增量迁移任务会持续进行。在增量迁移任务正常运行期间,迁移实例的运行状态运行中

映射信息

  1. 已选择对象区域框中,将鼠标指针放置在目标Topic名(集合级别)上。

  2. 单击目标Topic名后出现的编辑

  3. 在弹出的编辑表对话框中,配置映射信息。

    配置

    说明

    配置

    说明

    表名称

    源集合迁移到的目标Topic名称,默认为源库及目标库配置阶段在目标库信息选择的Topic

    重要
    • 填写的Topic名称必须在目标Kafka实例中真实存在,否则将会导致数据迁移失败。

    • 若您修改了表名称,数据将会被写入到您填写的Topic中。

    过滤条件

    详情请参见设置过滤条件

    设置新建TopicPartition数量

    数据写入到目标Topic时的分区数。

  4. 单击确定

投递数据场景

场景一:使用Oplog方式迁移增量数据

实例主要配置
数据投递示例

迁移方式选择为Oplog,且迁移类型选择了增量迁移

源库增量变更类型

源库增量变更语句

目标Topic接收到的数据

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

查看数据(单击展开)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973438,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

查看数据(单击展开)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848051984,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

查看数据(单击展开)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147734,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

查看数据(单击展开)

{
	"data": [{
		"$unset": {
			"salary": true
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208186,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

查看数据(单击展开)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848289798,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

查看数据(单击展开)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 1741847893000000005,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847893760,
	"type": "DDL"
}

场景二:使用ChangeStream方式迁移增量数据(迁移更新字段的数据)

实例主要配置
数据投递示例

迁移方式选择为ChangeStream迁移类型选择了增量迁移,且是否获取更新操作后的完整文档选择为

源库增量变更类型

源库增量变更语句

目标Topic接收到的数据

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

查看数据(单击展开)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973803,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

查看数据(单击展开)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052912,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

查看数据(单击展开)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848148056,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

查看数据(单击展开)

{
	"data": [{
		"$unset": {
			"salary": 1
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848209142,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

查看数据(单击展开)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290254,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

查看数据(单击展开)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894679,
	"type": "DDL"
}

场景三:使用ChangeStream方式迁移增量数据(迁移更新字段对应文档的完整数据)

实例主要配置
数据投递示例
特殊情况

迁移方式选择为ChangeStream迁移类型选择了增量迁移,且是否获取更新操作后的完整文档选择为

源库增量变更类型

源库增量变更语句

目标Topic接收到的数据

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

查看数据(单击展开)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973128,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

查看数据(单击展开)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052219,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

查看数据(单击展开)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"salary": 100.0,
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147327,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

查看数据(单击展开)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208401,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

查看数据(单击展开)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290499,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

查看数据(单击展开)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894045,
	"type": "DDL"
}

注意事项

当更新事件的 fullDocument 字段缺失时,数据的投递结果等同于使用Oplog方式迁移增量数据

示例

源库基础数据

源库增量变更语句

目标Topic接收到的数据

use admin
db.runCommand({ enablesharding:"dts_test" }) 
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

查看数据(单击展开)

{
	"data": [{
		"$set": {
			"name": "b"
		}
	}],
	"database": "dts_test",
	"es": 1740720994000,
	"gtid": null,
	"id": 174072099400000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"name": "a",
		"_id": 1.0
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "cstest",
	"ts": 1740721007099,
	"type": "UPDATE"
}
  • 本页导读 (1)
  • 前提条件
  • 注意事项
  • 费用说明
  • 迁移类型说明
  • 数据库账号的权限要求
  • 操作步骤
  • 映射信息
  • 投递数据场景
  • 场景一:使用Oplog方式迁移增量数据
  • 场景二:使用ChangeStream方式迁移增量数据(迁移更新字段的数据)
  • 场景三:使用ChangeStream方式迁移增量数据(迁移更新字段对应文档的完整数据)
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等