Read data from MongoDB using batch synchronization

更新时间:
复制 MD 格式

DataWorks Data Integration provides the MongoDB Reader plugin to read data from MongoDB and synchronize it to other data sources. This topic demonstrates how to use Data Integration to batch synchronize data from MongoDB to MaxCompute.

Background

In this tutorial, the source is MongoDB and the destination is MaxCompute. Before you begin, prepare the MongoDB data and create a destination table in MaxCompute.

Prerequisites

Before you start, make sure you meet the following requirements:

  • You have activated DataWorks and created a MaxCompute data source.

  • This tutorial uses an exclusive resource group for Data Integration to run the batch task. You must purchase and configure an exclusive resource group for Data Integration. For more information, see Use an exclusive resource group for Data Integration.

    Note

    You can also use a serverless resource group. For more information, see Use a serverless resource group.

Prepare sample data and tables

This tutorial requires a MongoDB collection and a MaxCompute table for the batch synchronization.

  1. Prepare a MongoDB collection.

    This tutorial uses ApsaraDB for MongoDB as an example. The following code shows how to prepare the MongoDB collection.

    1. Create a collection named di_mongodb_conf_test.

      db.createCollection('di_mongodb_conf_test')
    2. Insert the sample data for this tutorial into the collection.

      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. Query the data inserted into MongoDB.

      db.getCollection("di_mongodb_conf_test").find({})

      The query result shows one test document in the collection, with an _id of 63dca714b8548a78e1dc3238. This document contains various data types: string (col_string), integer (col_int32/col_int64), floating-point (col_double/col_decimal), Boolean (col_boolean), date/time (col_date/col_timestamp), and array (col_array_to_join/col_array_to_json). It also includes a nested document, col_doc, and two extra fields, col_extra_1 and col_extra_2.

  2. Prepare a MaxCompute table.

    1. Create a partitioned table named di_mongodb_conf_test with pt as the partition field.

      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. Add a partition with the value 20230202.

      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. Verify that the partitioned table is created correctly.

      SELECT*FROM di_mongodb_conf_test
      WHEREpt='20230202';

Configure the batch synchronization task

Step 1: Add a MongoDB data source

Add a MongoDB data source and ensure network connectivity between the data source and the exclusive resource group for Data Integration. For more information, see Add a MongoDB data source.

Step 2: Create and configure a batch synchronization node

In DataWorks DataStudio, create a batch synchronization node and configure its source and destination. This section highlights the key parameters; you can use the default values for others. For detailed instructions, see Configure a batch synchronization node using the Codeless UI.

  1. Configure the network connection.

    Select the MongoDB and MaxCompute data sources, the corresponding exclusive resource group for Data Integration, and then test the connectivity.

  2. Configure the task: Select the data sources.

    For the source and destination, select the MongoDB collection and the partitioned MaxCompute table.

  3. Configure the task: Map fields.

    When the data source is MongoDB, Peer mapping is used by default. You can also click the icon icon to manually edit the source table fields. The following provides an example of manual editing.

    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}

    After you edit the fields, the UI displays the mapping between the source and destination fields.

Step 3: Commit and deploy the node

If you use a standard-mode workspace and want to schedule this task, commit and deploy the node to the production environment. For more information, see Deploy tasks.

Step 4: Run the node and view the result

After configuring the node, run it. When the task is complete, view the synchronized data in the MaxCompute table. The synchronized data includes the following fields and values: col_string (mock string value), col_int32 (1), col_int32_min (-2147483648), col_int32_max (2147483647), col_int64 (1234567890123456), col_decimal (9999999.4999999999), col_double (9999999.99), col_boolean (true), col_timestamp (2023-02-03 14:17:56.554), col_date (2023-02-03), col_array_to_json ([a, b]), col_array_to_join (a,b), and pt (20230202). Some numeric fields are stored as text. The values for the complex fields are as follows:

  • The content of the col_doc field is as follows.

    {
      "key_array_to_join": [
        "a",
        "b"
      ],
      "key_array_to_json": [
        "a",
        "b"
      ],
      "key_boolean": true,
      "key_date": "2023-02-03 14:17:56",
      "key_decimal": {
        "finite": true,
        "high": 34711494127958097792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
      },
      "key_double": 9999999.99,
      "key_int32": 1,
      "key_int32_max": 2147483647,
      "key_int32_min": -2147483648,
      "key_int64": 1234567890123456,
      "key_int64_max": 9223372036854775807,
      "key_int64_min": -9223372036854775807,
      "key_string": "mock string value",
      "key_timestamp": "2023-02-03 14:17:56"
    }
  • The content of the col_combine field is as follows.

    {
      "col_extra_1": "this is extra 1",
      "col_extra_2": "this is extra 2"
    }
Note

For information about issues related to the output of Decimal type data, see Appendix 2: Handle the output of the Decimal type in documents.

Appendix 1: Data format conversion

Convert array data to JSON format: col_array_to_json

Source MongoDB data

Field mapping configuration

Output to MaxCompute

{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}

In the field mapping configuration, if type is string, the synchronization task serializes the original data into JSON format for output at runtime.

[a, b]

Convert an array to a joined string: col_array_to_join

Source MongoDB data

Field mapping configuration

Output to MaxCompute

{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}

When you configure field mapping, the splitter parameter is required if the type is array. When the sync task runs, it joins the content of the source data array by using the splitter, and the final output is the concatenated string.

a,b

Synchronize nested document fields

Source MongoDB data

Field mapping configuration

Output to MaxCompute

{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}

name specifies the path of the field to be synchronized in the document. When the synchronization task runs, it reads the document based on the path and outputs the data.

mock string value

Serialize a document as a JSON string

Source MongoDB data

Field mapping configuration

Output to MaxCompute

{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}

When you configure field mapping, if type is string, the synchronization task serializes the entire col_doc into a JSON string and outputs it at runtime.

{"key_string":"mockstringvalue","key_int32":1}

Serialize unmapped fields as JSON

Source MongoDB data

Field mapping configuration

Output to MaxCompute

{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}

This document has four fields. col_1 and col_2 are explicitly mapped. During synchronization, the remaining unmapped fields (col_3 and col_4) are serialized into a JSON object and output.

{"col_3":"value3","col_4":"value4"}

Appendix 2: Decimal type output

When a document is serialized to JSON format, Decimal128 data is, by default, output as follows:

{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}

To output the data as a numeric type, follow these steps:

  1. When configuring the batch synchronization task, switch to script mode.

  2. Modify the Reader task configuration: in the parameter section, add the parameter decimal128OutputType and set its value to bigDecimal.

    "parameter": {
        "collectionName": "di_mongodb_conf_test",
        "decimal128OutputType":"bigDecimal"
    },
    "name": "Reader",
    "category": "reader"
  3. Run the batch synchronization task again and view the result.

    {
        "key_decimal": "9999999.4999999999"
    }