本文为您介绍如何通过DataWorks的数据集成功能,将从MongoDB提取的JSON字段迁移至MaxCompute。

前提条件

在MongoDB上准备测试数据

  1. 账号准备。
    在数据库内新建用户,用于DataWorks添加数据源。本示例执行如下命令。
    db.createUser({user:"bookuser",pwd:"123456",roles:["root"]})

    新建用户名为bookuser,密码为123456,权限为root

  2. 数据准备。
    将数据上传至MongoDB数据库。本示例使用阿里云的云数据库MongoDB版,网络类型为VPC(需申请公网地址,否则无法与DataWorks默认资源组互通),测试数据如下。
    {
                            "store": {
                            "book": [
                            {
                            "category": "reference",
                            "author": "Nigel Rees",
                            "title": "Sayings of the Century",
                            "price": 8.95
                            },
                            {
                            "category": "fiction",
                            "author": "Evelyn Waugh",
                            "title": "Sword of Honour",
                            "price": 12.99
                            },
                            {
                            "category": "fiction",
                            "author": "J. R. R. Tolkien",
                            "title": "The Lord of the Rings",
                            "isbn": "0-395-19395-8",
                            "price": 22.99
                            }
                            ],
                            "bicycle": {
                            "color": "red",
                            "price": 19.95
                            }
                            },
                            "expensive": 10
                            }
  3. 在MongoDB的DMS控制台,本示例使用的数据库为admin,集合为userlog。执行如下命令,查看已上传的数据。
    db.userlog.find().limit(10)

通过DataWorks将JSON数据从MongoDB迁移至MaxCompute

  1. 登录DataWorks控制台
  2. 在DataWorks上创建目标表。用以接收从MangoDB迁移的数据。
    1. 右键单击业务流程,选择新建 > MaxCompute >
    2. 新建表页面,选择引擎类型并输入表名
    3. 在表的编辑页面,单击DDL模式
    4. DDL模式对话框,输入建表语句,单击生成表结构
      create table mqdata (MQ data string);
    5. 单击提交到生产环境
  3. 新增MongoDB数据源,详情请参见配置MongoDB数据源
  4. 创建离线同步节点。
    1. 进入数据开发页面,右键单击指定业务流程,选择新建 > 数据集成 > 离线同步
    2. 新建节点对话框中,输入节点名称,并单击提交
    3. 在顶部菜单栏上,单击转化脚本图标。
    4. 在脚本模式下,单击顶部菜单栏上的**图标。
    5. 导入模板对话框中选择来源类型数据源目标类型数据源,并单击确定
    6. 输入如下脚本。
      {
          "type": "job",
          "steps": [
          {
              "stepType": "mongodb",
              "parameter": {
                  "datasource": "mongodb_userlog",//数据源名称。
                  "column": [
                      {
                      "name": "store.bicycle.color", //JSON字段路径,本例中提取color值。
                      "type": "document.String" //非一层子属性以最终获取的类型为准。假如您选取的JSON字段为一级字段,例如本例中的expensive,则直接填写string即可。
                      }
                    ],
                  "collectionName": "userlog"   //集合名称。
                  },
              "name": "Reader",
              "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                  "partition": "",
                  "isCompress": false,
                  "truncate": true,
                  "datasource": "odps_first",
                  "column": [
                  "mqdata"  //MaxCompute表列名。
                  ],
                  "emptyAsNull": false,
                  "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
                  }
                  ],
                  "version": "2.0",
                  "order": {
                  "hops": [
                  {
                  "from": "Reader",
                  "to": "Writer"
                  }
                  ]
                  },
                  "setting": {
                  "errorLimit": {
                  "record": ""
                  },
                  "speed": {
                  "concurrent": 2,
                  "throttle": false,
                  }
                  }
              }
    7. 单击**图标运行代码。
    8. 您可以在运行日志查看运行结果。

验证结果

  1. 右键单击业务流程,选择新建 > MaxCompute > ODPS SQL
  2. 新建节点对话框中输入节点名称,并单击提交
  3. 在ODPS SQL节点编辑页面输入如下语句。
    SELECT * from mqdata;
  4. 单击**图标运行代码。
  5. 您可以在运行日志查看运行结果。