DataWorks的数据集成为您提供MongoDB Reader插件,可从MongoDB中读取数据,并将数据同步至其他数据源。本文以一个具体的示例,为您演示如何通过数据集成将MongoDB的数据离线同步至MaxCompute。

背景信息

本实践的来源数据源为MongoDB,去向数据源为MaxCompute。在进行数据同步前,您需要参考下文的数据准备,将待同步的MongoDB数据准备好,并创建一个用于同步数据的MaxCompute表。

前提条件

本实践进行操作时,需满足以下条件。
  • 已开通DataWorks并绑定MaxCompute引擎。
  • 本实践使用独享数据集成资源组进行离线任务运行,因此您需先购买并配置独享数据集成资源组。操作详情请参见新增和使用独享数据集成资源组

准备示例数据表

本实践需准备一个MongoDB数据集合、一个MaxCompute表,用于后续进行离线数据同步。

  1. 准备MongoDB数据集合。
    本实践使用阿里云云数据库MongoDB作为示例。以下为准备MongoDB数据集合的主要代码示例。
    1. 创建一个名称为di_mongodb_conf_test的数据集合。
      db.createCollection('di_mongodb_conf_test')
    2. 向数据集合中插入本实践的示例数据。
      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. 查询插入MongoDB中的数据。
      db.getCollection("di_mongodb_conf_test").find({})
      查询结果为:mongodb
  2. 准备MaxCompute表。
    1. 创建一个名称为di_mongodb_conf_test的分区表,分区字段为pt
      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. 添加一个分区取值20230202
      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. 检查分区表是否正确创建。
      SELECT  * FROM di_mongodb_conf_test
      WHERE   pt = '20230202';

离线任务配置

step1:添加MongoDB数据源

添加一个MongoDB数据源,保障数据源与独享数据集成资源组之间网络连通。操作详情请参见配置MongoDB数据源

step2:创建离线同步节点,并配置离线同步任务

在DataWorks的DataStudio中创建一个离线同步节点,并配置离线同步的来源与去向等任务配置参数,核心配置要点如下,其他参数可保持默认值即可。详细操作请参见通过向导模式配置离线同步任务
  1. 配置同步网络连接。

    选择上述步骤中创建的数据源、绑定的MaxCompute引擎数据源和对应的独享数据集成资源组,测试完成连通性。

  2. 配置任务:选择数据源。
    选择上述准备数据步骤中准备的MongoDB数据集合和MaxCompute分区表。选择数据源
  3. 配置任务:字段映射。
    数据源为MongoDB时,默认使用同行映射。您也可以单击图标图标手动编辑源表字段,手动编辑的示例如下。
    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}
    手动后,界面可展示来源字段与目标字段的映射关系。字段映射

step3:提交发布离线同步节点

如果您使用的是标准模式的DataWorks工作空间,并且希望后续在生产环境中周期性调度此离线同步任务的话,您可以将离线同步节点提交发布到生产环境。操作详情请参见发布任务

step4:运行离线同步节点,查看同步结果

完成上述配置后,您可以运行同步节点,待运行完成后,查看同步至MaxCompute表中的数据。结果数据其中:
  • col_doc字段内容如下。字段信息
  • col_combine字段内容如下。字段2
说明 关于decimal类型输出问题,参考下文附录2:关于document中Decimal类型输出问题

附录1:同步过程中的数据格式转换说明

数组类型数据转换为JSON格式输出:col_array_to_json

MongoDB原始数据字段映射配置输出至MaxCompute的结果
{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}
字段映射配置时,typestring,同步任务运行时,会将原始数据序列化为JSON格式进行输出。
[a, b]

数组类型数据转换为拼接字符串格式输出:col_array_to_join

MongoDB原始数据字段映射配置输出至MaxCompute的结果
{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}
字段映射配置时,typearraysplitter为必填项。同步任务运行时,会将原始数据数组内容,使用splitter进行拼接,最终输出为拼接后的字符串。
a,b

Document数据存在多层嵌套时,读取嵌套中的指定字段同步

MongoDB原始数据字段映射配置输出至MaxCompute的结果
{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}
namedocument中待同步字段的path,同步任务运行时,会按path读取document,并将数据输出。
mock string value

Document数据作为json序列化输出

MongoDB原始数据字段映射配置输出至MaxCompute的结果
{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}
字段映射配置时,typestring,同步任务运行时,会将col_doc整体进行json序列化后输出。
{"key_string":"mockstringvalue","key_int32":1}

Document数据除去已配置字段外,其他字段整体进行json序列化输出

MongoDB原始数据字段映射配置输出至MaxCompute的结果
{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}
document共有字段4个,任务已配置非combine类型字段2个(col_1、col_2),同步任务运行时,会将col_1、col_2两个字段外的其他字段按json序列化输出。
{"col_3":"value3","col_4":"value4"}

附录2:关于document中Decimal类型输出问题

关于document序列化为JSON格式输出时,Decimal128类型默认情况下,会输出为:
{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}
如果需要按数字类型输出,则可以按如下步骤处理:
  1. 在配置离线同步任务时,单击等不转脚本按钮,转换为脚本模式。
  2. 修改Reader任务配置,在parameter中增加参数decimal128OutputType,value固定填写bigDecimaldecimal
  3. 重新运行离线同步任务,查看结果。
    {
        "key_decimal": "9999999.4999999999"
    }