MongoDB数据源为您提供读取和写入MongoDB双向通道的功能,本文为您介绍DataWorks的MongoDB数据同步的能力支持情况。
支持的版本
仅支持4.x、5.x、6.x、7.x版本的MongoDB。
使用说明
- 数据集成支持使用MongoDB数据库对应账号进行连接,如果您使用的是云数据库MongoDB版,默认会有一个root账号。出于安全策略的考虑,在添加使用MongoDB数据源时,请避免使用root作为访问账号。 
- 如果MongoDB为分片集群,则在配置数据源时,需要配置mongos地址,避免配置mongod/shard节点地址。否则同步任务在抽取MongoDB中数据时,可能会导致只查询到指定shard的数据,而非预期的全集。关于mongos、mongod,详情请参考mongos、mongod。 
- 暂不支持MongoDB主备集群。 
- 在并发大于1的情况下,同步任务配置的集合中所有 - _id字段类型必须一致(例如,- _id字段都为string类型或者ObjectId类型),否则会出现部分数据无法同步的问题。说明- 并发大于1时,任务拆分会使用 - _id字段进行划分,因此在此场景下- _id字段不支持混合类型。如果- _id有多种字段类型,您可以使用单并发的形式进行数据同步,且不配置splitFactor或splitFactor配置为1。
 
- 数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。 
- 自建MongoDB数据库不支持公网访问,仅支持阿里云内网访问。 
- 暂不支持基于Docker部署的MongoDB集群。 
- 数据集成目前不支持在数据查询(参数query)配置中读取指定列的数据。 
- 离线同步任务中,如果MongoDB无法获取字段结构,将默认按照6个字段生成字段映射,字段名分别为 - col1,- col2,- col3,- col4,- col5,- col6。
- 在同步任务运行时,默认优先使用 - splitVector命令进行任务分片,在部分MongoDB版本中,不支持- splitVector命令,进而会导致报错- no such cmd splitVector,您可以在同步任务配置中,单击 按钮,进入脚本模式,在MongoDB的parameter配置中,增加以下参数,避免使用 按钮,进入脚本模式,在MongoDB的parameter配置中,增加以下参数,避免使用- splitVector。- "useSplitVector" : false
支持的字段类型
MongoDB Reader支持的MongoDB数据类型
数据集成支持大部分MongoDB类型,但也存在部分不被支持的情况,请注意检查您的数据类型。
对于支持读取的数据类型,数据集成在读取时:
- 基本类型的数据,会根据同步任务配置的读取字段(column,详见下文的附录:MongoDB脚本Demo与参数说明)中的name自动读取对应path下的数据,并根据数据类型做自动转换,您无需指定column的type属性。 - 类型 - 离线读(MongoDB Reader) - 说明 - ObjectId - 支持 - 对象ID类型。 - Double - 支持 - 64位浮点数类型。 - 32-bit integer - 支持 - 32位整数。 - 64-bit integer - 支持 - 64位整数。 - Decimal128 - 支持 - Decimal128类型。 说明- 如果配置为嵌套类型、Combine类型,JSON序列化时会被当做对象处理,需增加参数 - decimal128OutputType为- bigDecimal,才能输出为decimal。- String - 支持 - 字符串类型。 - Boolean - 支持 - 布尔类型。 - Timestamp - 支持 - 时间戳类型。 说明- BsonTimestamp存储的是时间戳,无需考虑时区影响,详情请参见MongoDB中的时区问题。 - Date - 支持 - 日期类型。 
- 部分复杂类型的数据,您可通过配置column的type属性,进行自定义处理。 - 类型 - 离线读(MongoDB Reader) - 说明 - Document - 支持 - 嵌入文档类型。 - 如果没有配置type属性,则直接将Document转JSON序列化处理。 
- 如果配置了type属性为 - document,则属于嵌套类型,MongoDB Reader会按path读取Document属性。详细示例请参见下文的数据类型示例2:递归解析处理多层嵌套的Document。
 - Array - 支持 - 数组类型。 - 如果type配置为 - array.json、- arrays,直接JSON序列化处理。
- 如果type配置为 - array、- document.array,则拼接为字符串,分隔符(column中的splitter)默认为英文逗号。
 重要- 数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。 
数据集成特殊数据类型:combine
| 类型 | 离线读(MongoDB Reader) | 说明 | 
| Combine | 支持 | 数据集成自定义类型。 如果type配置为 | 
MongoDB Reader数据类型转换
结合上文可见,MongoDB Reader针对MongoDB类型的转换列表,如下表所示。
| 转换后的类型分类 | MongoDB数据类型 | 
| LONG | INT、LONG、document.INT和document.LONG | 
| DOUBLE | DOUBLE和document.DOUBLE | 
| STRING | STRING、ARRAY、document.STRING、document.ARRAY和COMBINE | 
| DATE | DATE和document.DATE | 
| BOOLEAN | BOOL和document.BOOL | 
| BYTES | BYTES和document.BYTES | 
MongoDB Writer数据类型转换
| 类型分类 | MongoDB数据类型 | 
| 整数类 | INT和LONG | 
| 浮点类 | DOUBLE | 
| 字符串类 | STRING和ARRAY | 
| 日期时间类 | DATE | 
| 布尔型 | BOOL | 
| 二进制类 | BYTES | 
数据类型示例1:Combine类型使用示例
MongoDB Reader插件的Combine数据类型支持将MongoDB document中的多个字段合并成一个JSON串。例如,导入MongoDB中的字段至MaxCompute,有字段如下(下文均省略了value使用key来代替整个字段)的三个document,其中a、b是所有document均有的公共字段,x_n是不固定字段。
- doc1: a b x_1 x_2
- doc2: a b x_2 x_3 x_4
- doc3: a b x_5
配置文件中要明确指出需要一一对应的字段,需要合并的字段则需另取名称(不可以与document中已存在字段同名),并指定类型为COMBINE,如下所示。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]最终导出的MaxCompute结果如下所示。
| odps_column1 | odps_column2 | odps_column3 | 
| a | b | {x_1,x_2} | 
| a | b | {x_2,x_3,x_4} | 
| a | b | {x_5} | 
使用COMBINE类型合并MongoDB Document中的多个字段后,输出结果映射至MaxCompute时会自动删除公共字段,仅保留Document的特有字段。
例如,a、b为所有Document均有的公共字段,Document文件doc1: a b x_1 x_2使用COMBINE类型合并字段后,输出结果本应该为{a,b,x_1,x_2},该结果映射至MaxCompute后,会删除公共字段a和b,最终输出的结果为{x_1,x_2}。
数据类型示例2:递归解析处理多层嵌套的Document
当MongoDB中Document存在多层嵌套时,可通过配置document类型进行递归解析处理。示例如下:
- MongoDB源端数据为: - { "name": "name1", "a": { "b": { "c": "this is value" } } }
- MongoDB列可配置为: - {"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"} 
如上配置,可将源端嵌套字段a.b.c的值写入目标端c字段中,同步任务运行后,目标端写入数据为this is value。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见数据源管理,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
- 脚本模式配置的全量参数和脚本Demo请参见下文的附录:MongoDB脚本Demo与参数说明。 
单表实时同步任务配置指导
操作流程请参见数据集成侧实时同步任务配置、DataStudio侧实时同步任务配置。
整库级别同步任务配置指导
整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步任务的配置操作,请参见整库离线同步任务、整库实时同步任务配置。
最佳实践
常见问题
附录:MongoDB脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
配置一个从MongoDB抽取数据到本地的作业,详情请参见下文的参数说明。
- 实际运行时,请删除下述代码中的注释。 
- 暂时不支持取出array中的指定元素。 
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", //数据源名称。
                "collectionName": "tag_data", //集合名称。
                "query": "", // 数据查询过滤。
                "column": [
                    {
                        "name": "unique_id", //字段名称。
                        "type": "string" //字段类型。
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "common": { 
            "column": { 
                "timeZone": "GMT+0" //时区
            } 
        },
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}| 参数 | 描述 | 
| datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 | 
| collectionName | MongoDB的集合名。 | 
| hint | MongoDB支持hint参数,使查询优化器使用特定索引来完成查询,在某些情况下,可以提高查询性能。详情请参见hint参数。示例如下:  | 
| column | MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。 
 | 
| batchSize | 批量获取的记录数,该参数为选填参数。默认值为 | 
| cursorTimeoutInMs | 游标超时时间,该参数为选填参数。默认值为 说明  
 | 
| query | 您可以通过该配置型来限制返回MongoDB数据范围,仅支持以下时间格式,不支持直接使用时间戳类型的格式。 说明  
 常用query示例如下: 
 说明  更多MongoDB的查询语法请参见MongoDB官方文档。 | 
| splitFactor | 如果存在比较严重的数据倾斜,可以考虑增加splitFactor,实现更小粒度的切分,无需增加并发数。 | 
Writer脚本Demo
配置写入MongoDB的数据同步作业,详情请参见下文的参数说明。
{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "mongodb",//插件名。
            "parameter": {
                "datasource": "",//数据源名。
                "column": [
                    {
                        "name": "_id",//列名。
                        "type": "ObjectId"//数据类型。如果replacekey为_id,则此处的type必须配置为ObjectID。如果配置为string,会无法进行替换。
                    },
                    {
                        "name": "age",
                        "type": "int"
                    },
                    {
                        "name": "id",
                        "type": "long"
                    },
                    {
                        "name": "wealth",
                        "type": "double"
                    },
                    {
                        "name": "hobby",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "valid",
                        "type": "boolean"
                    },
                    {
                        "name": "date_of_join",
                        "format": "yyyy-MM-dd HH:mm:ss",
                        "type": "date"
                    }
                ],
                "writeMode": {//写入模式。
                    "isReplace": "true",
                    "replaceKey": "_id"
                },
                "collectionName": "datax_test"//连接名称。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {//错误记录数。
            "record": "0"
        },
        "speed": {
            "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent": 1,//作业并发数。
            "mbps": "1"//限流的速度,此处1mbps = 1MB/s。
        },
       "jvmOption": "-Xms1024m -Xmx1024m"
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}Writer脚本参数
| 参数 | 描述 | 是否必选 | 默认值 | 
| datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 | 
| collectionName | MongoDB的集合名。 | 是 | 无 | 
| column | MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。 
 | 是 | 无 | 
| writeMode | 指定了传输数据时是否覆盖的信息,包括isReplace和replaceKey: 
 说明  当isReplace设置为true,且将非 原因是写入数据中,存在 | 否 | 无 | 
| preSql | 表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。 | 否 | 无 | 
执行数据集成作业时,会首先执行您已配置的preSql。完成preSql的执行后,才可以进入实际的数据写出阶段。preSql本身不会影响写出的数据内容。数据集成通过preSql参数,可以具备幂等执行特性。例如,您的preSql在每次任务执行前都会清理历史数据(根据您的业务规则进行清理)。此时,如果任务失败,您只需要重新执行数据集成作业即可。
preSql的格式要求如下:
- 需要配置type字段,表示前置操作类别,支持drop和remove,例如 - "preSql":{"type":"remove"}:- drop:表示删除集合和集合内的数据,collectionName参数配置的集合即是待删除的集合。 
- remove:表示根据条件删除数据。 
- json:您可以通过JSON控制待删除的数据条件,例如 - "preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。此处的- ${last_day}为DataWorks调度参数,格式为- $[yyyy-mm-dd]。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等)、逻辑操作符(and和or等)或函数(max、min、sum、avg和ISODate等)。- 数据集成通过如下MongoDB标准API执行您的数据,删除query。 - query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);说明- 如果您需要条件删除数据,建议优先使用JSON配置形式。 
- item:您可以在item中配置数据过滤的列名(name)、条件(condition)和列值(value)。例如 - "preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}。- 数据集成会基于您配置的item条件项,构造查询query条件,进而通过MongoDB标准API执行删除。例如 - col.deleteMany(query);。
 
- 不识别的preSql,无需进行任何前置删除操作。