数据传输服务DTS(Data Transmission Service)支持将云数据库MongoDB版(副本集架构)实例的增量数据同步至函数计算FC的指定函数。您可以编写函数代码,结合同步至函数中的数据,对数据进行二次加工。
前提条件
注意事项
| 类型 | 说明 | 
| 源库限制 | 
 | 
| 其他限制 | 
 | 
| 特殊情况 | 当源库为自建MongoDB时: 
 说明  如果同步对象选择为整库,您还可以创建心跳表,心跳表每秒定期更新或者写入数据。 | 
费用说明
| 同步类型 | 链路配置费用 | 
| 增量数据同步 | 收费,详情请参见计费概述。 | 
支持同步的操作
| 同步类型 | 说明 | 
| 增量同步 | 使用Oplog增量同步不支持在任务开始运行后新建的数据库,支持同步的增量更新如下: 
 使用ChangeStream支持同步的增量更新如下: 
 | 
数据库账号的权限要求
| 数据库 | 所需权限 | 创建及授权方式 | 
| 源云数据库MongoDB版 | 待同步库、admin库和local库的read权限。 | 
操作步骤
- 进入目标地域的同步任务列表页面(二选一)。 - 通过DTS控制台进入- 登录数据传输服务DTS控制台。 
- 在左侧导航栏,单击数据同步。 
- 在页面左上角,选择同步实例所属地域。 
 - 通过DMS控制台进入说明- 实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台和自定义DMS界面布局与样式。 - 登录DMS数据管理服务。 
- 在顶部菜单栏中,选择。 
- 在同步任务右侧,选择同步实例所属地域。 
 
- 单击创建任务,进入任务配置页面。 
- 配置源库及目标库信息。 - 类别 - 配置 - 说明 - 无 - 任务名称 - DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。 - 源库信息 - 选择已有连接信息 - 若您需要使用已录入系统(新建或保存)的数据库实例,请在下拉列表中选择所需的数据库实例,下方的数据库信息将自动进行配置。 说明- DMS控制台的配置项为选择DMS数据库实例。 
- 若您未将数据库实例录入到系统,或无需使用已录入系统的数据库实例,则需要手动配置下方的数据库信息。 
 - 数据库类型 - 选择MongoDB。 - 接入方式 - 选择云实例。 - 实例地区 - 选择源云数据库MongoDB版实例所属地域。 - 是否跨阿里云账号 - 本示例为同一阿里云账号间的同步,选择不跨账号。 - 架构类型 - 选择副本集架构。 - 迁移方式 - 请根据实际情况,选择增量数据同步的方式。 - Oplog(推荐): - 若源库已开启Oplog日志,则支持此选项。 说明- 本地自建MongoDB和云数据库MongoDB版默认已开启Oplog日志,且使用此方式同步增量数据时增量同步任务的延迟较小(拉取日志的速度较快),因此推荐选择Oplog。 
- ChangeStream: - 若源库已开启变更流(Change Streams),则支持此选项。 说明- 源库为Amazon DocumentDB(非弹性集群)时,仅支持选择ChangeStream。 
- 源库架构类型选择为分片集群架构,无需填写Shard账号和Shard密码。 
 
 - 实例ID - 选择源云数据库MongoDB版实例ID。 - 鉴权数据库名称 - 填入源云数据库MongoDB版实例数据库账号所属的数据库名称,若未修改过则为默认的admin。 - 数据库账号 - 填入源云数据库MongoDB版实例的数据库账号,权限要求请参见数据库账号的权限要求。 - 数据库密码 - 填入该数据库账号对应的密码。 - 连接方式 - DTS支持非加密连接、SSL安全连接和Mongo Atlas SSL三种连接方式。连接方式的选项与接入方式和架构类型有关,请以控制台为准。 说明- 架构类型为分片集群架构,且迁移方式为Oplog的MongoDB数据库,不支持SSL安全连接。 
- 若源库为自建(接入方式不为云实例)副本集架构的MongoDB数据库,并且选择了SSL安全连接,DTS还支持上传CA证书对连接进行校验。 
 - 目标库信息 - 选择已有连接信息 - 若您需要使用已录入系统(新建或保存)的数据库实例,请在下拉列表中选择所需的数据库实例,下方的数据库信息将自动进行配置。 说明- DMS控制台的配置项为选择DMS数据库实例。 
- 若您未将数据库实例录入到系统,或无需使用已录入系统的数据库实例,则需要手动配置下方的数据库信息。 
 - 数据库类型 - 选择函数计算 FC。 - 接入方式 - 选择云实例。 - 实例地区 - 默认与源库实例地区一致,且不支持修改。 - 服务 - 选择目标函数计算FC的服务名称。 - 函数 - 选择目标函数计算FC用于接收数据的函数。 - 服务版本和别名 - 请根据实际情况进行选择。 - 默认版本:服务版本固定为LATEST。 
- 指定版本:您还需选择服务版本。 
- 指定别名:您还需选择服务别名。 
 说明- 目标函数计算FC的专有名词,请参见基本概念。 
- 配置完成后,在页面下方单击测试连接以进行下一步。 说明- 请确保DTS服务的IP地址段能够被自动或手动添加至源库和目标库的安全设置中,以允许DTS服务器的访问。更多信息,请参见添加DTS服务器的IP地址段。 
- 若源库或目标库为自建数据库(接入方式不是云实例),则还需要在弹出的DTS服务器访问授权对话框单击测试连接。 
 
- 配置任务对象。 - 在对象配置页面,配置待同步的对象。 - 配置 - 说明 - 同步类型 - 仅支持增量同步,且默认已选中。 - 数据格式 - 同步到FC函数中的数据存储格式,当前仅支持Canal Json。 说明- Canal Json的参数说明和示例,请参见Canal Json说明。 - 源库对象 - 在源库对象框中单击待同步对象,然后单击  将其移动至已选择对象框。说明 将其移动至已选择对象框。说明- 同步对象的选择粒度为库或集合。 - 已选择对象 - 请在已选择对象框中确认待同步的数据。 说明- 如需移除已选择的对象,可以在已选择对象框中勾选目标对象,并单击  进行移除。 进行移除。
- 如需按库或集合级别筛选需要同步的增量更新操作,请在已选择对象框中右键单击待同步的对象,并在弹出的对话框中进行选择。 
 
- 单击下一步高级配置,进行高级参数配置。 - 配置 - 说明 - 选择调度该任务的专属集群 - DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群。 - 源库、目标库无法连接后的重试时间 - 在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。 说明- 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。 
- 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。 
 - 源库、目标库出现其他问题后的重试时间 - 在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。 重要- 源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。 - 是否获取更新操作后的完整文档 - 增量数据同步阶段,是否将更新操作后对应文档(Document)的完整数据同步到目标端。 说明- 仅当迁移方式选择ChangeStream时,才有此配置项。 - 是:同步更新字段对应文档的完整数据。 重要- 此功能基于MongoDB的原生能力实现,可能会导致源库的负载增加,从而降低增量数据采集的速度,并进一步导致同步实例产生延迟。 
- 若DTS无法成功获取完整数据,则仅同步更新字段的数据。 
 
- 否:只同步更新字段的数据。 
 - 是否限制增量同步速率 - 您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS和每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。 - 环境标签 - 您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。 - 配置ETL功能 - 选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL。 - 是:配置ETL功能,并在文本框中填写数据处理语句,详情请参见在DTS迁移或同步任务中配置ETL。 
- 否:不配置ETL功能。 
 - 监控告警 - 是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。 - 不设置:不设置告警。 
- 设置:设置告警,您还需要设置告警阈值和告警联系人。更多信息,请参见在配置任务过程中配置监控告警。 
 
 
- 保存任务并进行预检查。 - 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数。 
- 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查。 
 说明- 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。 
- 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。 
- 如果预检查产生警告: - 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。 
- 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情、确认屏蔽、确定、重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。 
 
 
- 购买实例。 - 预检查通过率显示为100%时,单击下一步购买。 
- 在购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。 - 类别 - 参数 - 说明 - 信息配置 - 计费方式 - 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。 
- 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。 
 - 资源组配置 - 实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理。 - 链路规格 - DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明。 - 订购时长 - 在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。 说明- 该选项仅在付费类型为预付费时出现。 
- 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》。 
- 单击购买并启动,并在弹出的确认对话框,单击确定。 - 您可在数据同步界面查看具体任务进度。 
 
后续步骤
- 若待同步的单条数据大于16 MB,导致DTS任务报错,您可以修改同步对象或者使用ETL功能,过滤大字段的数据。操作方法,请参见在已有同步任务上修改ETL配置和修改同步对象。 
- 根据实际需求,编写函数代码。更多信息,请参见代码开发概述。 
目标服务接收的数据格式
FC接收到的数据类型为Object,源端增量数据以数组的方式存储在Records字段中,数组中的每一个元素为一条Object类型的数据记录。其中Object字段和含义如下表所示。
FC接收到的数据有DML和DDL两种:
- DDL:记录更改数据库的结构信息,如CreateIndex、CreateCollection、DropIndex、DropCollection等。 
- DML:记录管理数据库中的数据信息,如INSERT、UPDATE、DELETE。 
| 字段 | 类型 | 说明 | 
| 
 | Boolean | 是否为DDL操作。 
 | 
| 
 | String | SQL操作的类型。 
 | 
| 
 | String | MongoDB的数据库名。 | 
| 
 | String | MongoDB的集合名。 | 
| 
 | String | MongoDB的主键名,固定为_id。 | 
| 
 | Long | 操作在源库的执行时间,13位Unix时间戳,单位为毫秒。 说明  Unix时间戳转换工具可用搜索引擎获取。 | 
| 
 | Long | 操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。 说明  Unix时间戳转换工具可用搜索引擎获取。 | 
| 
 | Object Array | Array中仅包含一个元素,类型为Object,其中Key为doc,value的类型为Json String。 说明  将value反序列化即可得到数据记录。 | 
| old | Object Array | 存储更新前的数据,格式同 data 字段。 重要  仅当 | 
| 
 | Int | 操作的序列号。 | 
DDL操作数据格式示例
创建集合
SQL语句
db.createCollection("testCollection")FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056437000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056437510
	}]
}删除集合
SQL语句
db.testCollection.drop()FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"drop": "testCollection"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056577000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056577789
	}]
}创建索引
SQL语句
db.testCollection.createIndex({name:1})FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056670000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056670719
	}]
}删除索引
SQL语句
db.testCollection.dropIndex({name:1})FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056817000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': '$cmd',
		'ts': 1694056818035
	}]
}DML操作数据格式示例
插入数据
SQL语句
// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 逐条插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784427
	}, {
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784428
	}]
}更新数据
SQL语句
db.user.update({"name":"jack"},{$set:{"age":30}}) FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"$set": {"age": 30}}'
		}],
		'pkNames': ['_id'],
		'old': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'type': 'UPDATE',
		'es': 1694054989000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054990555
	}]
}删除数据
SQL语句
db.user.remove({"name":"jack"})FC接收到的数据
{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DELETE',
		'es': 1694055452000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694055452852
	}]
}