本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
您可以在数据传输服务DTS(Data Transmission Service)控制台创建数据同步任务,实时或定时将云数据库 MongoDB 版的分片集群架构实例的增量数据同步至函数计算的指定函数。然后结合同步至函数中的数据,在函数计算编写相应的代码,对接收到的增量数据进行分析、存储等业务逻辑处理。
前提条件
注意事项
类型 | 说明 |
源库限制 |
|
其他限制 |
|
费用说明
同步类型 | 链路配置费用 |
增量数据同步 | 收费,详情请参见计费概述。 |
支持同步的操作
同步类型 | 说明 |
增量同步 | 使用Oplog增量同步不支持在任务开始运行后新建的数据库,支持同步的增量更新如下:
使用ChangeStream支持同步的增量更新如下:
|
操作步骤
进入目标地域的同步任务列表页面(二选一)。
通过DTS控制台进入
登录数据传输服务DTS控制台。
在左侧导航栏,单击数据同步。
在页面左上角,选择同步实例所属地域。
通过DMS控制台进入
说明实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台和自定义DMS界面布局与样式。
登录DMS数据管理服务。
在顶部菜单栏中,选择
。在同步任务右侧,选择同步实例所属地域。
单击创建任务,进入任务配置页面。
可选:在页面右上角,单击试用新版配置页。
说明若您已进入新版配置页(页面右上角的按钮为返回旧版配置页),则无需执行此操作。
新版配置页和旧版配置页部分参数有差异,建议使用新版配置页。
配置源库及目标库信息。
类别
配置
说明
无
任务名称
DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
源库信息
选择已有的DMS数据库实例(可选,如未创建可忽略此处选择,直接在下方配置数据库信息即可)
您可以按实际需求,选择是否使用已有实例。
如使用已有实例,下方数据库信息将自动填入,您无需重复输入。
如不使用已有实例,您需要输入下方的数据库信息。
数据库类型
选择MongoDB。
接入方式
选择云实例。
实例地区
选择源云数据库MongoDB版实例所属地域。
是否跨阿里云账号
本示例为同一阿里云账号间的同步,选择不跨账号。
架构类型
选择分片集群架构。
迁移方式
请根据实际情况,选择增量数据同步的方式。
Oplog(推荐):
若源库已开启Oplog日志,则支持此选项。
说明本地自建MongoDB和云数据库MongoDB版默认已开启Oplog日志,且使用此方式同步增量数据时增量同步任务的延迟较小(拉取日志的速度较快),因此推荐选择Oplog。
ChangeStream:
若源库已开启变更流(Change Streams),则支持此选项。
说明源库为Amazon DocumentDB(非弹性集群)时,仅支持选择ChangeStream。
源库架构类型选择为分片集群架构,无需填写Shard账号和Shard密码。
实例ID
选择源云数据库MongoDB版实例ID。
鉴权数据库名称
填入源云数据库MongoDB版实例数据库账号所属的数据库名称,若未修改过则为默认的admin。
数据库账号
填入源云数据库MongoDB版实例的数据库账号,权限要求请参见数据库账号的权限要求。
数据库密码
填入该数据库账号对应的密码。
Shard账号
填入源云数据库MongoDB版的数据库Shard账号。
Shard密码
填入源云数据库MongoDB版数据库Shard账号的密码。
目标库信息
选择已有的DMS数据库实例(可选,如未创建可忽略此处选择,直接在下方配置数据库信息即可)
您可以按实际需求,选择是否使用已有实例。
如使用已有实例,下方数据库信息将自动填入,您无需重复输入。
如不使用已有实例,您需要输入下方的数据库信息。
数据库类型
选择函数计算 FC。
接入方式
选择云实例。
实例地区
默认与源库实例地区一致,且不支持修改。
服务
选择目标函数计算FC的服务名称。
函数
选择目标函数计算FC用于接收数据的函数。
服务版本和别名
请根据实际情况进行选择。
默认版本:服务版本固定为LATEST。
指定版本:您还需选择服务版本。
指定别名:您还需选择服务别名。
说明目标函数计算FC的专有名词,请参见基本概念。
配置完成后,单击页面下方的测试连接以进行下一步。
如果源或目标数据库是阿里云数据库实例(例如RDS MySQL、云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添加到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段。
警告DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。
配置任务对象及高级配置。
配置
说明
同步类型
仅支持增量同步,且默认已选中。
数据格式
同步到FC函数中的数据存储格式,当前仅支持Canal Json。
说明Canal Json的参数说明和示例,请参见Canal Json说明。
源库对象
在源库对象框中单击待同步对象,然后单击将其移动至已选择对象框。
说明同步对象的选择粒度为库或集合。
已选择对象
请在已选择对象框中确认待同步的数据。
说明若您需要移除已选择的对象,可以在已选择对象框中勾选目标对象,并单击进行移除。
单击下一步高级配置,进行高级配置。
配置
说明
选择调度该任务的专属集群
DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群。
设置告警
是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。
不设置:不设置告警。
设置:设置告警,您还需要设置告警阈值和告警联系人。更多信息,请参见在配置任务过程中配置监控告警。
源库、目标库无法连接后的重试时间
在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。
说明针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。
由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。
源库、目标库出现其他问题后的重试时间
在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。
重要源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。
是否限制增量同步速率
您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS和每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。
环境标签
您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。
配置ETL功能
选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL。
是:配置ETL功能,并在文本框中填写数据处理语句,详情请参见在DTS迁移或同步任务中配置ETL。
否:不配置ETL功能。
保存任务并进行预检查。
若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数。
若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查。
说明在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。
如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
如果预检查产生警告:
对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情、确认屏蔽、确定、重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。
预检查通过率显示为100%时,单击下一步购买。
在购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。
类别
参数
说明
信息配置
计费方式
预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。
后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。
资源组配置
实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理。
链路规格
DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明。
订购时长
在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。
说明该选项仅在付费类型为预付费时出现。
配置完成后,阅读并勾选《数据传输(按量付费)服务条款》。
单击购买并启动,并在弹出的确认对话框,单击确定。
您可在数据同步界面查看具体任务进度。
预检查通过率显示为100%时,单击下一步购买。
目标服务接收的数据格式
FC接收到的数据类型为Object,源端增量数据以数组的方式存储在Records字段中,数组中的每一个元素为一条Object类型的数据记录。其中Object字段和含义如下表所示。
FC接收到的数据有DML和DDL两种:
DDL:记录更改数据库的结构信息,如CreateIndex、CreateCollection、DropIndex、DropCollection等。
DML:记录管理数据库中的数据信息,如INSERT、UPDATE、DELETE。
字段 | 类型 | 说明 |
| Boolean | 是否为DDL操作。
|
| String | SQL操作的类型。
|
| String | MongoDB的数据库名。 |
| String | MongoDB的集合名。 |
| String | MongoDB的主键名,固定为_id。 |
| Long | 操作在源库的执行时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可用搜索引擎获取。 |
| Long | 操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可用搜索引擎获取。 |
| Object Array | Array中仅包含一个元素,类型为Object,其中Key为doc,value的类型为Json String。 说明 将value反序列化即可得到数据记录。 |
| Int | 操作的序列号。 |
DDL操作数据格式示例
创建集合
SQL语句
db.createCollection("testCollection")
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056437510
}]
}
删除集合
SQL语句
db.testCollection.drop()
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}
创建索引
SQL语句
db.testCollection.createIndex({name:1})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}
删除索引
SQL语句
db.testCollection.dropIndex({name:1})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}
DML操作数据格式示例
插入数据
SQL语句
// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 逐条插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}
更新数据
SQL语句
db.user.update({"name":"jack"},{$set:{"age":30}})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}'
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}
删除数据
SQL语句
db.user.remove({"name":"jack"})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}