本文为您介绍MongoDB Reader支持的数据类型、字段映射和数据源等参数及配置示例。

MongoDB Reader插件通过MongoDB的Java客户端MongoClient,进行MongoDB的读操作。最新版本的Mongo已经将DB锁的粒度,从DB级别降低至document级别,配合MongoDB强大的索引功能,即可达到高性能读取MongoDB的需求。
说明
  • 如果您使用的是云数据库MongoDB版,MongoDB默认会有root账号。出于安全策略的考虑,数据集成仅支持使用MongoDB数据库对应账号进行连接。您添加使用MongoDB数据源时,也请避免使用root作为访问账号。
  • query不支持JS语法。

MongoDB Reader通过数据集成框架从MongoDB并行地读取数据,通过主控的Job程序,按照指定规则对MongoDB中的数据进行分片并行读取,然后将MongoDB支持的类型通过逐一判断转换为数据集成支持的类型。

类型转换列表

MongoDB Reader支持大部分MongoDB类型,但也存在部分没有支持的情况,请注意检查您的数据类型。

MongoDB Reader针对MongoDB类型的转换列表,如下所示。
类型分类 MongoDB数据类型
LONG INT、LONG、document.INT和document.LONG
DOUBLE DOUBLE和document.DOUBLE
STRING STRING、ARRAY、document.STRING、document.ARRAY和COMBINE
DATE DATE和document.DATE
BOOLEAN BOOL和document.BOOL
BYTES BYTES和document.BYTES
说明 document类型为嵌入文档类型,即OBJECT类型。

COMBINE类型的使用如下:

使用MongoDB Reader插件读出数据时,支持将MongoDB document中的多个字段合并成一个JSON串。

例如,导入MongoDB中的字段至MaxCompute,有字段如下(下文均省略了value使用key来代替整个字段)的三个document,其中a、b是所有document均有的公共字段,x_n是不固定字段。

doc1: a b x_1 x_2

doc2: a b x_2 x_3 x_4

doc3: a b x_5

配置文件中要明确指出需要一一对应的字段,需要合并的字段则需另取名称(不可以与document中已存在字段同名),并指定类型为COMBINE,如下所示。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
最终导出的MaxCompute结果如下所示。
odps_column1 odps_column2 odps_column3
a b {x_1,x_2}
a b {x_2,x_3,x_4}
a b {x_5}
说明

使用COMBINE类型合并MongoDB Document中的多个字段后,输出结果映射至MaxCompute时会自动删除公共字段,仅保留Document的特有字段。

例如,a、b为所有Document均有的公共字段,Document文件doc1: a b x_1 x_2使用COMBINE类型合并字段后,输出结果本应该为{a,b,x_1,x_2},该结果映射至MaxCompute后,会删除公共字段a和b,最终输出的结果为{x_1,x_2}

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。
collectionName MonogoDB的集合名。
column MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。
  • namecolumn的名字。
  • type支持的类型包括:
    • string:表示字符串。
    • long:表示整型数。
    • double表示浮点数。
    • date表示日期。
    • bool表示布尔值。
    • bytes:表示二进制序列。
    • arrays以JSON字符串格式读出,例如["a","b","c"]。
    • array以分隔符splitter分隔的方式读出,例如a,b,c,推荐使用arrays格式。
    • combine使用MongoDB Reader插件读出数据时,支持合并MongoDB document中的多个字段为一个JSON串。
  • splitter:因为MongoDB支持数组类型,但数据集成框架本身不支持数组类型,所以MongoDB读出来的数组类型,需要通过该分隔符合并成字符串。
query 您可以通过该配置型来限制返回MongoDB数据范围,仅支持时间类型。例如您可以配置"query":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}",限制返回operationTime大于等于${last_day}零点的数据。此处${last_day}为DataWorks调度参数,其中last_day格式为yyyy-mm-dd。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等),逻辑操作符(and和or等),函数(max、min、sum、avg和ISODate等)。

向导开发介绍

暂不支持向导开发模式。

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

配置一个从MongoDB抽取数据到本地的作业,详情请参见上述参数说明。
注意
  • 实际运行时,请删除下述代码中的注释。
  • 暂时不支持取出array中的指定元素。
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", //数据源名称。
                "collectionName": "tag_data", //集合名称。
                "query": "", // 数据查询过滤。
                "column": [
                    {
                        "name": "unique_id", //字段名称。
                        "type": "string" //字段类型。
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}