Migrate JSON data from MongoDB to MaxCompute

更新时间:
复制 MD 格式

This topic describes how to use the Data Integration feature of DataWorks to migrate JSON fields from MongoDB to MaxCompute.

Prerequisites

Prepare test data in MongoDB

  1. Prepare a database account.

    Create a user in the database. DataWorks uses this account to add the data source. In this example, run the following command.

    db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})

    This command creates a user named bookuser with the password 123456 and grants the user a role with data access permissions.

  2. Prepare the data.

    Upload your data to the MongoDB database. This example uses an ApsaraDB for MongoDB instance that runs in a virtual private cloud (VPC). You must apply for a public endpoint for the instance to communicate with the shared resource group for DataWorks. This example uses the following sample data.

    {
        "store": {
            "book": [
                {
                    "category": "reference",
                    "author": "Nigel Rees",
                    "title": "Sayings of the Century",
                    "price": 8.95
                    },
                {
                    "category": "fiction",
                    "author": "Evelyn Waugh",
                    "title": "Sword of Honour",
                    "price": 12.99
                    },
                {
                    "category": "fiction",
                    "author": "J. R. R. Tolkien",
                    "title": "The Lord of the Rings",
                    "isbn": "0-395-19395-8",
                    "price": 22.99
                    }
                        ],
            "bicycle": {
                "color": "red",
                "price": 19.95
                    }
                        },
            "expensive": 10
                }
  3. In the Data Management Service (DMS) console for MongoDB, run the following command to view the uploaded data. This example uses the admin database and the userlog collection.

    db.userlog.find().limit(10)

Migrate JSON data with DataWorks

  1. Login DataWorks console.

  2. Create a destination table in DataWorks to store the migrated data.

    1. Right-click a created Workflow, Select Create Table > Table

    2. In Create Table page, select the engine type, and enter Name.

    3. On the table editing page, click DDL Statement.

    4. In the DDL Statement dialog box, enter the table creation statement and click Generate Table Schema.

      Important

      The table name in the CREATE TABLE statement must match the Table Name you entered in the Create Table dialog box.

      create table mqdata (mqdata string);
    5. Click Commit to Production Environment.

  3. Add a MongoDB data source. For more information, see Configure a MongoDB data source.

  4. Create a batch synchronization node.

    1. Go to the data analytics page. Right-click the specified workflow and choose Create Node > Data Integration > Offline synchronization.

    2. In Create Node dialog box, enter Name, and click Confirm.

    3. In the top navigation bar, choose Conversion scripticon.

    4. In script mode, click **icon.

    5. In import Template dialog box SOURCE type, data source, target type and data source, and click confirm.

    6. Enter the following script.

      {
          "type": "job",
          "steps": [
          {
              "stepType": "mongodb",
              "parameter": {
                  "datasource": "mongodb_userlog", // The name of the source MongoDB data source.
                  "column": [
                      {
                      "name": "store.bicycle.color", // The path of the JSON field. In this example, the value of the color field is extracted.
                      "type": "document.String" // For nested fields, specify the data type of the final extracted value. If you select a top-level field, such as expensive in this example, you can simply specify string.
                      }
                    ],
                  "collectionName": "userlog"   // The name of the collection.
                  },
              "name": "Reader",
              "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                  "partition": "",
                  "isCompress": false,
                  "truncate": true,
                  "datasource": "odps_source", // The name of the destination MaxCompute data source.
                  "column": [
                  "mqdata"  // The column name in the MaxCompute table.
                  ],
                  "emptyAsNull": false,
                  "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
                  }
                  ],
                  "version": "2.0",
                  "order": {
                  "hops": [
                  {
                  "from": "Reader",
                  "to": "Writer"
                  }
                  ]
                  },
                  "setting": {
                  "errorLimit": {
                  "record": ""
                  },
                  "speed": {
                  "concurrent": 2,
                  "throttle": false,
                  }
                  }
              }
    7. Click **icon to run the code.

    8. You can operation Log view the results.

Verify the results

  1. Right-click the workflow and choose new > MaxCompute > ODPS SQL.

  2. In create a node dialog box, enter node name, and click submit.

  3. On the ODPS SQL node editing page, enter the following statement.

    SELECT * from mqdata;
  4. Click **icon to run the code.

  5. You can Runtime Log view the results.