本文为您介绍MongoDB Writer支持的数据类型、写入方式、字段映射和数据源等参数和配置示例。

背景信息

MongoDB Writer插件利用MongoDB的Java客户端MongoClient进行MongoDB的写操作。最新版本的Mongo已经将DB锁的粒度从DB级别降低到Document级别,配合MongoDB强大的索引功能,基本可以满足数据源向MongoDB写入数据的需求。针对数据更新的需求,也可以通过配置业务主键的方式实现。
说明
  • 在开始配置MongoDB Writer插件前,请首先配置好数据源,详情请参见配置MongoDB数据源
  • 如果您使用的是云数据库MongoDB版,MongoDB默认会有用root账号。
  • 出于安全策略的考虑,数据集成仅支持使用 MongoDB数据库对应账号进行连接。您在添加使用MongoDB数据源时,请避免使用root作为访问账号。

MongoDB Writer通过数据集成框架获取Reader生成的协议数据,然后将支持的类型通过逐一判断转换为MongoDB支持的类型。数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。

您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。

类型转换列表

MongoDB Writer支持大部分MongoDB类型,但也存在部分没有支持的情况,请注意检查您的数据类型。

MongoDB Writer针对MongoDB类型的转换列表,如下所示。
类型分类 MongoDB数据类型
整数类 INT和LONG
浮点类 DOUBLE
字符串类 STRING和ARRAY
日期时间类 DATE
布尔型 BOOL
二进制类 BYTES
说明 此处DATE类型,写入至MongoDB后即为DATETIME类型。

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
collectionName MongoDB的集合名。
column MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。
  • name:Column的名字。
  • type:Column的类型。
  • splitter:特殊分隔符,当且仅当要处理的字符串要用分隔符分隔为字符数组Array时,才使用此参数。通过此参数指定的分隔符,将字符串分隔存储到MongoDB的数组中。
writeMode 指定了传输数据时是否覆盖的信息,包括isReplacereplaceKey
  • isReplace:当设置为true时,表示针对相同的replaceKey做覆盖操作。当设置为false时,表示不覆盖。
  • replaceKey:replaceKey指定了每行记录的业务主键,用来做覆盖时使用(不支持replaceKey为多个键,通常指Mongo中的主键)。
preSql 表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。

执行数据集成作业时,会首先执行您已配置的preSql。完成preSql的执行后,才可以进入实际的数据写出阶段。preSql本身不会影响写出的数据内容。数据集成通过preSql参数,可以具备幂等执行特性。例如,您的preSql在每次任务执行前都会清理历史数据(根据您的业务规则进行清理)。此时,如果任务失败,您只需要重新执行数据集成作业即可。

preSql的格式要求如下:
  • 需要配置type字段,表示前置操作类别,支持drop和remove,例如"preSql":{"type":"remove"}
    • drop:表示删除集合和集合内的数据,collectionName参数配置的集合即是待删除的集合。
    • remove:表示根据条件删除数据。
    • json:您可以通过JSON控制待删除的数据条件,例如"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。此处的${last_day}为DataWorks调度参数,格式为$[yyyy-mm-dd]。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等)、逻辑操作符(and和or等)或函数(max、min、sum、avg和ISODate等)。
      数据集成通过如下MongoDB标准API执行您的数据,删除query。
      query=(BasicDBObject) com.mongodb.util.JSON.parse(json);                
      col.deleteMany(query);
      说明 如果您需要条件删除数据,建议优先使用JSON配置形式。
    • item:您可以在item中配置数据过滤的列名(name)、条件(condition)和列值(value)。例如"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}

      数据集成会基于您配置的item条件项,构造查询query条件,进而通过MongoDB标准API执行删除。例如col.deleteMany(query);

  • 不识别的preSql,无需进行任何前置删除操作。

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。
    集合名称 即上述参数说明中的collectionName
    写入模式(是否覆盖) 即上述参数说明中的writeMode
    前置条件 即上述参数说明中的preSql。表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。
  2. 字段映射,即上述参数说明中的column。默认使用同行映射。您可以单击图标图标手动编辑目标表字段。字段映射
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

配置写入MongoDB的数据同步作业,详情请参见上述参数说明。
{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "mongodb",//插件名。
            "parameter": {
                "datasource": "",//数据源名。
                "column": [
                    {
                        "name": "_id",//列名。
                        "type": "ObjectId"//数据类型。如果replacekey为_id,则此处的type必须配置为ObjectID。如果配置为string,会无法进行替换。
                    },
                    {
                        "name": "age",
                        "type": "int"
                    },
                    {
                        "name": "id",
                        "type": "long"
                    },
                    {
                        "name": "wealth",
                        "type": "double"
                    },
                    {
                        "name": "hobby",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "valid",
                        "type": "boolean"
                    },
                    {
                        "name": "date_of_join",
                        "format": "yyyy-MM-dd HH:mm:ss",
                        "type": "date"
                    }
                ],
                "writeMode": {//写入模式。
                    "isReplace": "true",
                    "replaceKey": "_id"
                },
                "collectionName": "datax_test"//连接名称。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {//错误记录数。
            "record": "0"
        },
        "speed": {
            "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent": 1,//作业并发数。
            "mbps": "1"//限流的速度。
        },
       "jvmOption": "-Xms1024m -Xmx1024m"
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}