DataWorks Data Integration provides the MongoDB Reader plugin to read data from MongoDB and synchronize it to other data sources. This topic demonstrates how to use Data Integration to batch synchronize data from MongoDB to MaxCompute.
Background
In this tutorial, the source is MongoDB and the destination is MaxCompute. Before you begin, prepare the MongoDB data and create a destination table in MaxCompute.
Prerequisites
Before you start, make sure you meet the following requirements:
-
You have activated DataWorks and created a MaxCompute data source.
-
This tutorial uses an exclusive resource group for Data Integration to run the batch task. You must purchase and configure an exclusive resource group for Data Integration. For more information, see Use an exclusive resource group for Data Integration.
NoteYou can also use a serverless resource group. For more information, see Use a serverless resource group.
Prepare sample data and tables
This tutorial requires a MongoDB collection and a MaxCompute table for the batch synchronization.
-
Prepare a MongoDB collection.
This tutorial uses ApsaraDB for MongoDB as an example. The following code shows how to prepare the MongoDB collection.
-
Create a collection named
di_mongodb_conf_test.db.createCollection('di_mongodb_conf_test') -
Insert the sample data for this tutorial into the collection.
db.di_mongodb_conf_test.insertOne({ 'col_string':'mock string value', 'col_int32':NumberInt("1"), 'col_int32_min':NumberInt("-2147483648"), 'col_int32_max':NumberInt("2147483647"), 'col_int64':NumberLong("1234567890123456"), 'col_int64_min':NumberLong("-9223372036854775807"), 'col_int64_max':NumberLong("9223372036854775807"), 'col_decimal':NumberDecimal("9999999.4999999999"), 'col_double':9999999.99, 'col_boolean':true, 'col_timestamp':ISODate(), 'col_date':new Date(), 'col_array_to_json':['a','b'], 'col_array_to_join':['a','b'], 'col_doc':{ 'key_string':'mock string value', 'key_int32':NumberInt("1"), 'key_int32_min':NumberInt("-2147483648"), 'key_int32_max':NumberInt("2147483647"), 'key_int64':NumberLong("1234567890123456"), 'key_int64_min':NumberLong("-9223372036854775807"), 'key_int64_max':NumberLong("9223372036854775807"), 'key_decimal':NumberDecimal("9999999.4999999999"), 'key_double':9999999.99, 'key_boolean':true, 'key_timestamp':ISODate(), 'key_date':new Date(), 'key_array_to_json':['a','b'], 'key_array_to_join':['a','b'], }, 'col_extra_1':'this is extra 1', 'col_extra_2':'this is extra 2', }) -
Query the data inserted into MongoDB.
db.getCollection("di_mongodb_conf_test").find({})The query result shows one test document in the collection, with an
_idof63dca714b8548a78e1dc3238. This document contains various data types: string (col_string), integer (col_int32/col_int64), floating-point (col_double/col_decimal), Boolean (col_boolean), date/time (col_date/col_timestamp), and array (col_array_to_join/col_array_to_json). It also includes a nested document,col_doc, and two extra fields,col_extra_1andcol_extra_2.
-
-
Prepare a MaxCompute table.
-
Create a partitioned table named
di_mongodb_conf_testwithptas the partition field.CREATE TABLE IF NOT EXISTS di_mongodb_conf_test ( `id` STRING ,`col_string` STRING ,`col_int32` INT ,`col_int32_min` INT ,`col_int32_max` INT ,`col_int64` BIGINT ,`col_int64_min` BIGINT ,`col_int64_max` BIGINT ,`col_decimal` DECIMAL(38,18) ,`col_double` DOUBLE ,`col_boolean` BOOLEAN ,`col_timestamp` TIMESTAMP ,`col_date` DATE ,`col_array_to_json` STRING ,`col_array_to_join` STRING ,`key_string` STRING ,`key_int32` INT ,`key_int32_min` INT ,`key_int32_max` INT ,`key_int64` BIGINT ,`key_int64_min` BIGINT ,`key_int64_max` BIGINT ,`key_decimal` DECIMAL(38,18) ,`key_double` DOUBLE ,`key_boolean` BOOLEAN ,`key_timestamp` TIMESTAMP ,`key_date` DATE ,`key_array_to_json` STRING ,`key_array_to_join` STRING ,`col_doc` STRING ,`col_combine` STRING ) PARTITIONED BY ( pt STRING ) LIFECYCLE 36500 ; -
Add a partition with the value
20230202.alter table di_mongodb_conf_test add if not exists partition (pt='20230202'); -
Verify that the partitioned table is created correctly.
SELECT*FROM di_mongodb_conf_test WHEREpt='20230202';
-
Configure the batch synchronization task
Step 1: Add a MongoDB data source
Add a MongoDB data source and ensure network connectivity between the data source and the exclusive resource group for Data Integration. For more information, see Add a MongoDB data source.
Step 2: Create and configure a batch synchronization node
In DataWorks DataStudio, create a batch synchronization node and configure its source and destination. This section highlights the key parameters; you can use the default values for others. For detailed instructions, see Configure a batch synchronization node using the Codeless UI.
-
Configure the network connection.
Select the MongoDB and MaxCompute data sources, the corresponding exclusive resource group for Data Integration, and then test the connectivity.
-
Configure the task: Select the data sources.
For the source and destination, select the MongoDB collection and the partitioned MaxCompute table.
-
Configure the task: Map fields.
When the data source is MongoDB, Peer mapping is used by default. You can also click the
icon to manually edit the source table fields. The following provides an example of manual editing.{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}After you edit the fields, the UI displays the mapping between the source and destination fields.
Step 3: Commit and deploy the node
If you use a standard-mode workspace and want to schedule this task, commit and deploy the node to the production environment. For more information, see Deploy tasks.
Step 4: Run the node and view the result
After configuring the node, run it. When the task is complete, view the synchronized data in the MaxCompute table. The synchronized data includes the following fields and values: col_string (mock string value), col_int32 (1), col_int32_min (-2147483648), col_int32_max (2147483647), col_int64 (1234567890123456), col_decimal (9999999.4999999999), col_double (9999999.99), col_boolean (true), col_timestamp (2023-02-03 14:17:56.554), col_date (2023-02-03), col_array_to_json ([a, b]), col_array_to_join (a,b), and pt (20230202). Some numeric fields are stored as text. The values for the complex fields are as follows:
-
The content of the
col_docfield is as follows.{ "key_array_to_join": [ "a", "b" ], "key_array_to_json": [ "a", "b" ], "key_boolean": true, "key_date": "2023-02-03 14:17:56", "key_decimal": { "finite": true, "high": 34711494127958097792, "infinite": false, "low": 99999994999999999, "naN": false, "negative": false }, "key_double": 9999999.99, "key_int32": 1, "key_int32_max": 2147483647, "key_int32_min": -2147483648, "key_int64": 1234567890123456, "key_int64_max": 9223372036854775807, "key_int64_min": -9223372036854775807, "key_string": "mock string value", "key_timestamp": "2023-02-03 14:17:56" } -
The content of the
col_combinefield is as follows.{ "col_extra_1": "this is extra 1", "col_extra_2": "this is extra 2" }
For information about issues related to the output of Decimal type data, see Appendix 2: Handle the output of the Decimal type in documents.
Appendix 1: Data format conversion
Convert array data to JSON format: col_array_to_json
|
Source MongoDB data |
Field mapping configuration |
Output to MaxCompute |
|
In the field mapping configuration, if |
|
Convert an array to a joined string: col_array_to_join
|
Source MongoDB data |
Field mapping configuration |
Output to MaxCompute |
|
When you configure field mapping, the |
|
Synchronize nested document fields
|
Source MongoDB data |
Field mapping configuration |
Output to MaxCompute |
|
|
|
Serialize a document as a JSON string
|
Source MongoDB data |
Field mapping configuration |
Output to MaxCompute |
|
When you configure field mapping, if |
|
Serialize unmapped fields as JSON
|
Source MongoDB data |
Field mapping configuration |
Output to MaxCompute |
|
This document has four fields. |
|
Appendix 2: Decimal type output
When a document is serialized to JSON format, Decimal128 data is, by default, output as follows:
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}
To output the data as a numeric type, follow these steps:
-
When configuring the batch synchronization task, switch to script mode.
-
Modify the Reader task configuration: in the parameter section, add the parameter
decimal128OutputTypeand set its value tobigDecimal."parameter": { "collectionName": "di_mongodb_conf_test", "decimal128OutputType":"bigDecimal" }, "name": "Reader", "category": "reader" -
Run the batch synchronization task again and view the result.
{ "key_decimal": "9999999.4999999999" }