本文为您介绍MongoDB Writer支持的数据类型、写入方式、字段映射和数据源等参数和配置示例。
背景信息
- 在开始配置MongoDB Writer插件前,请首先配置好数据源,详情请参见配置MongoDB数据源。
- 如果您使用的是云数据库MongoDB版,MongoDB默认会有用root账号。
- 出于安全策略的考虑,数据集成仅支持使用 MongoDB数据库对应账号进行连接。您在添加使用MongoDB数据源时,请避免使用root作为访问账号。
MongoDB Writer通过数据集成框架获取Reader生成的协议数据,然后将支持的类型通过逐一判断转换为MongoDB支持的类型。数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。
您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。
类型转换列表
MongoDB Writer支持大部分MongoDB类型,但也存在部分没有支持的情况,请注意检查您的数据类型。
类型分类 | MongoDB数据类型 |
---|---|
整数类 | INT和LONG |
浮点类 | DOUBLE |
字符串类 | STRING和ARRAY |
日期时间类 | DATE |
布尔型 | BOOL |
二进制类 | BYTES |
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
collectionName | MongoDB的集合名。 | 是 | 无 |
column | MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。
|
是 | 无 |
writeMode | 指定了传输数据时是否覆盖的信息,包括isReplace和replaceKey:
说明 当
isReplace设置为true,且将非
_id 字段配置为
replaceKey,后续运行时会出现类似以下的报错:
原因是写入数据中,存在
_id 与
replaceKey不匹配的数据,详情请参见
报错:After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"。
|
否 | 无 |
preSql | 表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。 | 否 | 无 |
执行数据集成作业时,会首先执行您已配置的preSql。完成preSql的执行后,才可以进入实际的数据写出阶段。preSql本身不会影响写出的数据内容。数据集成通过preSql参数,可以具备幂等执行特性。例如,您的preSql在每次任务执行前都会清理历史数据(根据您的业务规则进行清理)。此时,如果任务失败,您只需要重新执行数据集成作业即可。
- 需要配置type字段,表示前置操作类别,支持drop和remove,例如
"preSql":{"type":"remove"}
:- drop:表示删除集合和集合内的数据,collectionName参数配置的集合即是待删除的集合。
- remove:表示根据条件删除数据。
- json:您可以通过JSON控制待删除的数据条件,例如
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}
。此处的${last_day}
为DataWorks调度参数,格式为$[yyyy-mm-dd]
。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等)、逻辑操作符(and和or等)或函数(max、min、sum、avg和ISODate等)。数据集成通过如下MongoDB标准API执行您的数据,删除query。query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
说明 如果您需要条件删除数据,建议优先使用JSON配置形式。 - item:您可以在item中配置数据过滤的列名(name)、条件(condition)和列值(value)。例如
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
。数据集成会基于您配置的item条件项,构造查询query条件,进而通过MongoDB标准API执行删除。例如
col.deleteMany(query);
。
- 不识别的preSql,无需进行任何前置删除操作。
向导开发介绍
- 选择数据源。
配置同步任务的 数据来源和 数据去向。
参数 描述 数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。 集合名称 即上述参数说明中的collectionName。 写入模式(是否覆盖) 即上述参数说明中的writeMode。 说明 当 写入模式(是否覆盖)设置为是,且将非_id
字段配置为 业务主键,后续运行时会出现类似以下的报错:
原因是写入数据中,存在After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"
_id
与 业务主键不匹配的数据,详情请参见 报错:After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"。前置条件 即上述参数说明中的preSql。表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。 - 字段映射,即上述参数说明中的column。默认使用同行映射。您可以单击
图标手动编辑目标表字段。
- 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。
{
"type": "job",
"version": "2.0",//版本号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",//插件名。
"parameter": {
"datasource": "",//数据源名。
"column": [
{
"name": "_id",//列名。
"type": "ObjectId"//数据类型。如果replacekey为_id,则此处的type必须配置为ObjectID。如果配置为string,会无法进行替换。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {//写入模式。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"//连接名称。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {//错误记录数。
"record": "0"
},
"speed": {
"throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent": 1,//作业并发数。
"mbps": "1"//限流的速度。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
报错:no such cmd splitVector
- 可能原因:
在同步任务运行时,默认优先使用
splitVector
命令进行任务分片,在部分MongoDB版本中,不支持splitVector
命令,进而会导致报错no such cmd splitVector
。 - 解决方案:
- 进入同步任务配置页面后,单击顶部的转换脚本
按钮。将任务修改为脚本模式。
- 在MongoDB的parameter配置中,增加参数
以避免使用"useSplitVector" : false
splitVector
。
- 进入同步任务配置页面后,单击顶部的转换脚本
报错:After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"
- 报错现象:
同步任务中,以向导模式为例,当配置了 写入模式(是否覆盖)为 是,且配置了非
_id
字段作为 业务主键,可能会出现此问题。 - 可能原因:
写入数据中,存在_id与配置的业务主键(如上述配置示例的
my_id
)不匹配的数据。 - 解决方案:
- 方案一:修改离线同步任务,确保配置的业务主键与_id一致。
- 方案二:进行数据同步时,将_id作为业务主键。