文档

MongoDB数据源

更新时间:

MongoDB数据源为您提供读取和写入MongoDB双向通道的功能,本文为您介绍DataWorks的MongoDB数据同步的能力支持情况。

支持的版本

仅支持4.x、5.x版本的MongoDB。

使用限制

  • 数据集成支持使用MongoDB数据库对应账号进行连接,如果您使用的是云数据库MongoDB版,默认会有一个root账号。出于安全策略的考虑,在添加使用MongoDB数据源时,请避免使用root作为访问账号。

  • 如果MongoDB为分片集群,则在配置数据源时,需要配置mongos地址,避免配置mongod/shard节点地址。否则同步任务在抽取MongoDB中数据时,可能会导致只查询到指定shard的数据,而非预期的全集。关于mongosmongod,详情请参考mongosmongod

  • 在并发大于1的情况下,同步任务配置的集合中所有_id字段类型必须一致(例如,_id字段都为string类型或者ObjectId类型),否则会出现部分数据无法同步的问题。

    说明

    并发大于1时,任务拆分会使用_id字段进行划分,因而在此场景下_id字段不支持混合类型。如果_id有多种字段类型,您可以使用单并发的形式进行数据同步,且不配置splitFactorsplitFactor配置为1。

  • 数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。

  • 自建MongonDB数据库不支持公网访问,仅支持阿里云内网访问。

  • 数据集成目前不支持在数据查询(参数query)配置中读取指定列的数据。

支持的字段类型

MongoDB Reader支持的MongoDB数据类型

数据集成支持大部分MongoDB类型,但也存在部分没有支持的情况,请注意检查您的数据类型。

对于支持读取的数据类型,数据集成在读取时:

  • 基本类型的数据,会根据同步任务配置的读取字段(column,详见下文的附录:MongoDB脚本Demo与参数说明)中的name自动读取对应path下的数据,并根据数据类型做自动转换,您无需指定column的type属性。

    类型

    离线读(MongoDB Reader)

    说明

    ObjectId

    支持

    对象ID类型。

    Double

    支持

    64位浮点数类型。

    32-bit integer

    支持

    32位整数。

    64-bit integer

    支持

    64位整数。

    Decimal128

    支持

    Decimal128类型。

    说明

    如果配置为嵌套类型、Combine类型,JSON序列化时会被当做对象处理,需增加参数decimal128OutputTypebigDecimal,才能输出为decimal。

    String

    支持

    字符串类型。

    Boolean

    支持

    布尔类型。

    Timestamp

    支持

    时间戳类型。

    说明

    BsonTimestamp存储的是时间戳,无需考虑时区影响,详情请参见MongoDB中的时区问题

    Date

    支持

    日期类型。

  • 部分复杂类型的数据,您可通过配置column的type属性,进行自定义处理。

    类型

    离线读(MongoDB Reader)

    说明

    Document

    支持

    嵌入文档类型。

    • 如果没有配置type属性,则直接将Document转JSON序列化处理。

    • 如果配置了type属性为document,则属于嵌套类型,MongoDB Reader会按path读取Document属性。详细示例请参见下文的数据类型示例2:递归解析处理多层嵌套的Document

    Array

    支持

    数组类型。

    • 如果type配置为array.jsonarrays,直接JSON序列化处理。

    • 如果type配置为arraydocument.array,则拼接为字符串,分隔符(column中的splitter)默认为英文逗号。

    重要

    数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。

数据集成特殊数据类型:combine

类型

离线读(MongoDB Reader)

说明

Combine

支持

数据集成自定义类型。

如果type配置为combine,MongoDB Reader会移除已配置的Column对应Key后,将整个Document其他所有信息进行JSON序列化输出,详细示例请参见下文数据类型示例1:Combine类型使用示例

MongoDB Reader数据类型转换

结合上文可见,MongoDB Reader针对MongoDB类型的转换列表,如下表所示。

转换后的类型分类

MongoDB数据类型

LONG

INT、LONG、document.INT和document.LONG

DOUBLE

DOUBLE和document.DOUBLE

STRING

STRING、ARRAY、document.STRING、document.ARRAY和COMBINE

DATE

DATE和document.DATE

BOOLEAN

BOOL和document.BOOL

BYTES

BYTES和document.BYTES

MongoDB Writer数据类型转换

类型分类

MongoDB数据类型

整数类

INT和LONG

浮点类

DOUBLE

字符串类

STRING和ARRAY

日期时间类

DATE

布尔型

BOOL

二进制类

BYTES

数据类型示例1:Combine类型使用示例

MongoDB Reader插件的Combine数据类型支持将MongoDB document中的多个字段合并成一个JSON串。例如,导入MongoDB中的字段至MaxCompute,有字段如下(下文均省略了value使用key来代替整个字段)的三个document,其中a、b是所有document均有的公共字段,x_n是不固定字段。

  • doc1: a b x_1 x_2

  • doc2: a b x_2 x_3 x_4

  • doc3: a b x_5

配置文件中要明确指出需要一一对应的字段,需要合并的字段则需另取名称(不可以与document中已存在字段同名),并指定类型为COMBINE,如下所示。

"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]

最终导出的MaxCompute结果如下所示。

odps_column1

odps_column2

odps_column3

a

b

{x_1,x_2}

a

b

{x_2,x_3,x_4}

a

b

{x_5}

说明

使用COMBINE类型合并MongoDB Document中的多个字段后,输出结果映射至MaxCompute时会自动删除公共字段,仅保留Document的特有字段。

例如,a、b为所有Document均有的公共字段,Document文件doc1: a b x_1 x_2使用COMBINE类型合并字段后,输出结果本应该为{a,b,x_1,x_2},该结果映射至MaxCompute后,会删除公共字段a和b,最终输出的结果为{x_1,x_2}

数据类型示例2:递归解析处理多层嵌套的Document

当MongoDB中Document存在多层嵌套时,可通过配置document类型进行递归解析处理。示例如下:

  • MongoDB源端数据为:

    {
        "name": "name1",
        "a":
        {
            "b":
            {
                "c": "this is value"
            }
        }
    }
  • MongoDB列可配置为:

    {"name":"_id","type":"string"}
    {"name":"name","type":"string"}
    {"name":"a.b.c","type":"document"}

    eg

如上配置,可将源端嵌套字段a.b.c的值写入目标端c字段中,同步任务运行后,目标端写入数据为this is value

数据同步任务开发

MongoDB数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源

单表离线同步任务配置指导

单表实时同步任务配置指导

操作流程请参见配置单表增量数据实时同步DataStudio侧实时同步任务配置

整库级别同步任务配置指导

整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步任务的配置操作,请参见数据集成侧同步任务配置

最佳实践

常见问题

附录:MongoDB脚本Demo与参数说明

附录:离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。

MongoDB Reader脚本Demo

配置一个从MongoDB抽取数据到本地的作业,详情请参见下文的参数说明。

重要
  • 实际运行时,请删除下述代码中的注释。

  • 暂时不支持取出array中的指定元素。

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", //数据源名称。
                "collectionName": "tag_data", //集合名称。
                "query": "", // 数据查询过滤。
                "column": [
                    {
                        "name": "unique_id", //字段名称。
                        "type": "string" //字段类型。
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

MongoDB Reader脚本参数

参数

描述

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

collectionName

MonogoDB的集合名。

hint

MongoDB支持hint参数,使查询优化器使用特定索引来完成查询,在某些情况下,可以提高查询性能。详情请参见hint参数。示例如下:

{
"collectionName":"test_collection",
"hint":"{age:1}"
}

column

MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。

  • namecolumn的名字。

  • type支持的类型包括:

    • string:表示字符串。

    • long:表示整型数。

    • double:表示浮点数。

    • date:表示日期。

    • bool:表示布尔值。

    • bytes:表示二进制序列。

    • arrays:以JSON字符串格式读出,例如["a","b","c"]。

    • array:以分隔符splitter分隔的方式读出,例如a,b,c,推荐使用arrays格式。

    • combine:使用MongoDB Reader插件读出数据时,支持合并MongoDB document中的多个字段为一个JSON串。

  • splitter:因为MongoDB支持数组类型,但数据集成框架本身不支持数组类型,所以MongoDB读出来的数组类型,需要通过该分隔符合并成字符串。

batchSize

批量获取的记录数,该参数为选填参数。默认值为1000条。

cursorTimeoutInMs

游标超时时间,该参数为选填参数。默认值为1000 * 60 * 10 = 600000。如果cursorTimeoutInMs配置为负值,则表示游标永不超时。

说明
  • 不推荐您设置游标永不超时。如果客户端程序意外退出,永不超时的游标将一直存在于MongoDB服务器中,直到服务重启。

  • 如果出现游标超时,您可以执行如下操作:

    • 减小批量获取的记录数batchSize

    • 增加游标超时时间cursorTimeoutInMs

query

您可以通过该配置型来限制返回MongoDB数据范围,仅支持以下时间格式,不支持直接使用时间戳类型的格式。

说明
  • query不支持JS语法。

  • 目前不支持读取指定列数据。

常用query示例如下:

  • 查询状态为normal的数据

    "query":"{ status: "normal"}"
  • status: "normal"

    "query":"{ status: { $in: [ "normal", "forbidden" ] }}"
  • AND语法,状态为正常,且年龄小于30

    "query":"{ status: "normal", age: { $lt: 30 }}"
  • 日期语法,创建时间大于等于2022-12-01 00:00:00.000,+0800表示东八时区

    "query":"{ createTime:{$gte:ISODate('2022-12-01T00:00:00.000+0800')}}"
  • 日期语法,使用调度参数占位符,查询创建时间大于等于某个时间点

    "query":"{ createTime:{$gte:ISODate('$[yyyy-mm-dd]T00:00:00.000+0800')}}"
    说明

    调度参数使用详情请参见:场景:调度参数在数据集成的典型应用场景,离线同步增量同步实现方式请参见:数据集成使用调度参数的相关说明

  • 非时间类型增量字段同步。

    可以通过赋值节点将字段处理为目标数据类型后,再传入数据集成进行数据同步。例如,当MongoDB存储的增量字段为时间戳,您可以通过赋值节点将时间类型字段通过引擎函数转换为时间戳,再传给离线同步任务使用,关于赋值节点的使用详情请参见:赋值节点

说明

更多MongoDB的查询语法请参见MongoDB官方文档

splitFactor

如果存在比较严重的数据倾斜,可以考虑增加splitFactor,实现更小粒度的切分,无需增加并发数。

MongoDB Writer脚本Demo

配置写入MongoDB的数据同步作业,详情请参见下文的参数说明。

{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "mongodb",//插件名。
            "parameter": {
                "datasource": "",//数据源名。
                "column": [
                    {
                        "name": "_id",//列名。
                        "type": "ObjectId"//数据类型。如果replacekey为_id,则此处的type必须配置为ObjectID。如果配置为string,会无法进行替换。
                    },
                    {
                        "name": "age",
                        "type": "int"
                    },
                    {
                        "name": "id",
                        "type": "long"
                    },
                    {
                        "name": "wealth",
                        "type": "double"
                    },
                    {
                        "name": "hobby",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "valid",
                        "type": "boolean"
                    },
                    {
                        "name": "date_of_join",
                        "format": "yyyy-MM-dd HH:mm:ss",
                        "type": "date"
                    }
                ],
                "writeMode": {//写入模式。
                    "isReplace": "true",
                    "replaceKey": "_id"
                },
                "collectionName": "datax_test"//连接名称。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {//错误记录数。
            "record": "0"
        },
        "speed": {
            "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent": 1,//作业并发数。
            "mbps": "1"//限流的速度,此处1mbps = 1MB/s。
        },
       "jvmOption": "-Xms1024m -Xmx1024m"
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

MongoDB Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。

collectionName

MongoDB的集合名。

column

MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。

  • name:Column的名字。

  • type:Column的类型。

    • int:表示32位整型数。

    • string:表示字符串。

    • array:splitter必须配置,用于分隔源端字符串,如:

      源端数据为a,b,csplitter配置英文逗号,则会将数据切分为数组["a","b","c"]写入MongoDB中。

      {"type":"array","name":"col_split_array","splitter":",","itemtype":"string"}
      说明

      array类型的itemtype参数支持的枚举类型包括doubleintlongboolbytesstring

    • json:表示JSON字符串格式。

    • long:表示长整型数。

    • date:表示日期。

    • double:表示浮点数。

    说明

    MongoDB Writer配置还支持写入嵌套类型,type配置增加document.前缀,表示写入嵌套类型,name则可以配置级联,如:

    {"type":"document.string","name":"col_nest.col_string"}
    {"type":"document.array","name":"col_nest.col_split_array","splitter":",","itemtype":"string"}
  • splitter:特殊分隔符,当且仅当要处理的字符串要用分隔符分隔为字符数组Array时,才使用此参数。通过此参数指定的分隔符,将字符串分隔存储到MongoDB的数组中。

writeMode

指定了传输数据时是否覆盖的信息,包括isReplacereplaceKey

  • isReplace:当设置为true时,表示针对相同的replaceKey做覆盖操作。当设置为false时,表示不覆盖。

  • replaceKey:replaceKey指定了每行记录的业务主键,用来做覆盖时使用(不支持replaceKey为多个键,通常指Mongo中的主键)。

说明

isReplace设置为true,且将非_id字段配置为replaceKey,后续运行时会出现类似以下的报错:

After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

原因是写入数据中,存在_idreplaceKey不匹配的数据,详情请参见常见问题:报错:After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

preSql

表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。

执行数据集成作业时,会首先执行您已配置的preSql。完成preSql的执行后,才可以进入实际的数据写出阶段。preSql本身不会影响写出的数据内容。数据集成通过preSql参数,可以具备幂等执行特性。例如,您的preSql在每次任务执行前都会清理历史数据(根据您的业务规则进行清理)。此时,如果任务失败,您只需要重新执行数据集成作业即可。

preSql的格式要求如下:

  • 需要配置type字段,表示前置操作类别,支持drop和remove,例如"preSql":{"type":"remove"}

    • drop:表示删除集合和集合内的数据,collectionName参数配置的集合即是待删除的集合。

    • remove:表示根据条件删除数据。

    • json:您可以通过JSON控制待删除的数据条件,例如"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。此处的${last_day}为DataWorks调度参数,格式为$[yyyy-mm-dd]。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等)、逻辑操作符(and和or等)或函数(max、min、sum、avg和ISODate等)。

      数据集成通过如下MongoDB标准API执行您的数据,删除query。

      query=(BasicDBObject) com.mongodb.util.JSON.parse(json);        
      col.deleteMany(query);
      说明

      如果您需要条件删除数据,建议优先使用JSON配置形式。

    • item:您可以在item中配置数据过滤的列名(name)、条件(condition)和列值(value)。例如"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}

      数据集成会基于您配置的item条件项,构造查询query条件,进而通过MongoDB标准API执行删除。例如col.deleteMany(query);

  • 不识别的preSql,无需进行任何前置删除操作。

  • 本页导读 (1)
文档反馈