本文将为您介绍如何通过DataWorks的数据集成功能,将从MongoDB提取的JSON字段迁移至MaxCompute。

准备工作

  1. 账号准备
    在数据库内新建用户,用于DataWorks添加数据源。本示例执行如下命令。
    db.createUser({user:"bookuser",pwd:"123456",roles:["root"]})

    新建用户名为bookuser,密码为123456,权限为root

  2. 数据准备
    首先您需要将数据上传至您的MongoDB数据库。本示例使用阿里云的云数据库MongoDB版,网络类型为VPC(需要申请公网地址,否则无法与DataWorks默认资源组互通),测试数据如下。
    {
                            "store": {
                            "book": [
                            {
                            "category": "reference",
                            "author": "Nigel Rees",
                            "title": "Sayings of the Century",
                            "price": 8.95
                            },
                            {
                            "category": "fiction",
                            "author": "Evelyn Waugh",
                            "title": "Sword of Honour",
                            "price": 12.99
                            },
                            {
                            "category": "fiction",
                            "author": "J. R. R. Tolkien",
                            "title": "The Lord of the Rings",
                            "isbn": "0-395-19395-8",
                            "price": 22.99
                            }
                            ],
                            "bicycle": {
                            "color": "red",
                            "price": 19.95
                            }
                            },
                            "expensive": 10
                            }
    登录MongoDB的DMS控制台,本示例使用的数据库为admin,集合为userlog。您可以在查询窗口执行如下命令,查看已上传的数据。
    db.userlog.find().limit(10)
    查看数据

通过DataWorks将JSON数据从MongoDB迁移至MaxCompute

  1. 新增MongoDB数据源
    1. 以项目管理员身份进入DataWorks控制台,单击对应工作空间操作栏中的进入数据集成
    2. 单击左侧导航栏上的数据源,进入数据源管理页面。
    3. 单击右上角新增数据源
    4. 新增数据源页面中,选择数据源类型为MongoDB
    5. 配置新增MongoDB数据源对话框中的参数。
      参数 描述
      数据源类型 由于本文中MongoDB处于VPC环境下,因此数据源类型需要选择连接串模式
      数据源名称 数据源名称必须以字母、数字、下划线组合,且不能以数字和下划线开头。
      数据源描述 对数据源进行简单描述,不得超过80个字符。
      适用环境 可以选择开发生产环境。
      说明 仅标准模式工作空间会显示此配置。
      访问地址 格式为host:port。如果此处您需要同时添加多个地址,请单击添加访问地址进行添加。
      说明 添加的访问地址必须全部为公网地址或全部为私网地址,不可以公网、私网地址混合。
      数据库名 该数据源对应的数据库名称。
      用户名 输入数据库对应的用户名。
      密码 输入数据库对应的密码。
    6. 单击测试连通性
    7. 测试连通性通过后,单击完成
  2. 新建数据同步任务

    在DataWorks上新建数据同步节点,详情请参见离线同步

    新建的同时,在DataWorks新建一个建表任务,用于存放JSON数据,本示例新建表名为mqdata。

    表参数可以通过图形化界面完成。本例中mqdata表仅有一列,类型为string,列名为MQ data。图形化界面新建表
  3. 配置同步任务参数
    完成上述新建后,您可以在图形化界面配置数据同步任务参数,如下图所示。选择数据来源类型为MongoDB,来源表为mongodb_userlog。目标数据源名称为odps_first,目标表为刚新建的mqdata。

    由于MongoDB数据源不支持向导模式开发,您直接点击转换为脚本,即可跳转至脚本模式进行配置。

    {
        "type": "job",
        "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "mongodb_userlog",//数据源名称。
                "column": [
                    {
                    "name": "store.bicycle.color", //JSON字段路径,本例中提取color值。
                    "type": "document.String" //非一层子属性以最终获取的类型为准。假如您选取的JSON字段为一级字段,如本例中的expensive,则直接填写string即可。
                    }
                  ],
                "collectionName //集合名称": "userlog"
                },
            "name": "Reader",
            "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                "partition": "",
                "isCompress": false,
                "truncate": true,
                "datasource": "odps_first",
                "column": [
                "mqdata"  //MaxCompute表列名。
                ],
                "emptyAsNull": false,
                "table": "mqdata"
                },
                "name": "Writer",
                "category": "writer"
                }
                ],
                "version": "2.0",
                "order": {
                "hops": [
                {
                "from": "Reader",
                "to": "Writer"
                }
                ]
                },
                "setting": {
                "errorLimit": {
                "record": ""
                },
                "speed": {
                "concurrent": 2,
                "throttle": false,
                }
                }
            }
    完成上述配置后,单击运行即可。运行成功日志示例如下所示。

JSON数据从MongoDB迁移至MaxCompute结果验证

  1. 在您的业务流程中新建一个ODPS SQL节点。新建ODPS SQL节点
  2. 输入SELECT * from mqdata;语句,查看当前mqdata表中数据。结果验证
    说明 您也可以直接在MaxCompute客户端中输入命令运行。