JSON数据从MongoDB迁移至MaxCompute

本文为您介绍如何通过DataWorks的数据集成功能,将从MongoDB提取的JSON字段迁移至MaxCompute。

前提条件

在MongoDB上准备测试数据

  1. 账号准备。

    在数据库内新建用户,用于DataWorks添加数据源。本示例执行如下命令。

    db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})

    新建用户名为bookuser,密码为123456,角色为任意拥有访问数据权限的角色。

  2. 数据准备。

    将数据上传至MongoDB数据库。本示例使用阿里云的云数据库MongoDB版,网络类型为VPC(需申请公网地址,否则无法与DataWorks公共资源组互通),测试数据如下。

    {
        "store": {
            "book": [
                {
                    "category": "reference",
                    "author": "Nigel Rees",
                    "title": "Sayings of the Century",
                    "price": 8.95
                    },
                {
                    "category": "fiction",
                    "author": "Evelyn Waugh",
                    "title": "Sword of Honour",
                    "price": 12.99
                    },
                {
                    "category": "fiction",
                    "author": "J. R. R. Tolkien",
                    "title": "The Lord of the Rings",
                    "isbn": "0-395-19395-8",
                    "price": 22.99
                    }
                        ],
            "bicycle": {
                "color": "red",
                "price": 19.95
                    }
                        },
            "expensive": 10
                }
  3. 在MongoDB的DMS控制台,本示例使用的数据库为admin,集合为userlog。执行如下命令,查看已上传的数据。

    db.userlog.find().limit(10)

通过DataWorks将JSON数据从MongoDB迁移至MaxCompute

  1. 登录DataWorks控制台

  2. 在DataWorks上创建目标表。用以接收从MongoDB迁移的数据。

    1. 右键单击已创建的业务流程,选择新建表 > MaxCompute >

    2. 新建表页面,选择引擎类型并输入表名

    3. 在表的编辑页面,单击DDL

    4. DDL模式对话框,输入建表语句,单击生成表结构

      重要

      建表语句中的表名称请与在新建表输入的表名一致。

      create table mqdata (mqdata string);
    5. 单击提交到生产环境

  3. 新增MongoDB数据源,详情请参见配置MongoDB数据源

  4. 创建离线同步节点。

    1. 进入数据开发页面,右键单击指定业务流程,选择新建节点 > 数据集成 > 离线同步

    2. 新建节点对话框中,输入节点名称,并单击确认

    3. 在顶部菜单栏上,单击转化脚本图标。

    4. 在脚本模式下,单击顶部菜单栏上的**图标。

    5. 导入模板对话框中选择来源类型数据源目标类型数据源,并单击确定

    6. 输入如下脚本。

      {
          "type": "job",
          "steps": [
          {
              "stepType": "mongodb",
              "parameter": {
                  "datasource": "mongodb_userlog",//数据源名称。
                  "column": [
                      {
                      "name": "store.bicycle.color", //JSON字段路径,本例中提取color值。
                      "type": "document.String" //非一层子属性以最终获取的类型为准。假如您选取的JSON字段为一级字段,例如本例中的expensive,则直接填写string即可。
                      }
                    ],
                  "collectionName": "userlog"   //集合名称。
                  },
              "name": "Reader",
              "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                  "partition": "",
                  "isCompress": false,
                  "truncate": true,
                  "datasource": "odps_source",// MaxCompute数据源名称
                  "column": [
                  "mqdata"  //MaxCompute表列名。
                  ],
                  "emptyAsNull": false,
                  "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
                  }
                  ],
                  "version": "2.0",
                  "order": {
                  "hops": [
                  {
                  "from": "Reader",
                  "to": "Writer"
                  }
                  ]
                  },
                  "setting": {
                  "errorLimit": {
                  "record": ""
                  },
                  "speed": {
                  "concurrent": 2,
                  "throttle": false,
                  }
                  }
              }
    7. 单击**图标运行代码。

    8. 您可以在运行日志查看运行结果。

验证结果

  1. 右键单击业务流程,选择新建节点 > MaxCompute > ODPS SQL

  2. 新建节点对话框中输入节点名称,并单击确认

  3. 在ODPS SQL节点编辑页面输入如下语句。

    SELECT * from mqdata;
  4. 单击**图标运行代码。

  5. 您可以在运行日志查看运行结果。