DataWorks的数据集成为您提供MongoDB Reader插件,可从MongoDB中读取数据,并将数据同步至其他数据源。本文以一个具体的示例,为您演示如何通过数据集成将MongoDB的数据离线同步至MaxCompute。
背景信息
本实践的来源数据源为MongoDB,去向数据源为MaxCompute。在进行数据同步前,您需要参考下文的数据准备,将待同步的MongoDB数据准备好,并创建一个用于同步数据的MaxCompute表。
前提条件
本实践进行操作时,需满足以下条件。
- 已开通DataWorks并绑定MaxCompute引擎。
- 本实践使用独享数据集成资源组进行离线任务运行,因此您需先购买并配置独享数据集成资源组。操作详情请参见新增和使用独享数据集成资源组。
准备示例数据表
本实践需准备一个MongoDB数据集合、一个MaxCompute表,用于后续进行离线数据同步。
- 准备MongoDB数据集合。本实践使用阿里云云数据库MongoDB作为示例。以下为准备MongoDB数据集合的主要代码示例。
- 准备MaxCompute表。
离线任务配置
step1:添加MongoDB数据源
添加一个MongoDB数据源,保障数据源与独享数据集成资源组之间网络连通。操作详情请参见配置MongoDB数据源。
step2:创建离线同步节点,并配置离线同步任务
在DataWorks的DataStudio中创建一个离线同步节点,并配置离线同步的来源与去向等任务配置参数,核心配置要点如下,其他参数可保持默认值即可。详细操作请参见通过向导模式配置离线同步任务。
- 配置同步网络连接。
选择上述步骤中创建的数据源、绑定的MaxCompute引擎数据源和对应的独享数据集成资源组,测试完成连通性。
- 配置任务:选择数据源。选择上述准备数据步骤中准备的MongoDB数据集合和MaxCompute分区表。
- 配置任务:字段映射。数据源为MongoDB时,默认使用同行映射。您也可以单击
图标手动编辑源表字段,手动编辑的示例如下。
手动后,界面可展示来源字段与目标字段的映射关系。{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}
step3:提交发布离线同步节点
如果您使用的是标准模式的DataWorks工作空间,并且希望后续在生产环境中周期性调度此离线同步任务的话,您可以将离线同步节点提交发布到生产环境。操作详情请参见发布任务。
step4:运行离线同步节点,查看同步结果
完成上述配置后,您可以运行同步节点,待运行完成后,查看同步至MaxCompute表中的数据。
其中:

col_doc
字段内容如下。col_combine
字段内容如下。
说明 关于decimal类型输出问题,参考下文附录2:关于document中Decimal类型输出问题。
附录1:同步过程中的数据格式转换说明
数组类型数据转换为JSON格式输出:col_array_to_json
MongoDB原始数据 | 字段映射配置 | 输出至MaxCompute的结果 |
---|---|---|
| 字段映射配置时,type 为string ,同步任务运行时,会将原始数据序列化为JSON格式进行输出。 |
|
数组类型数据转换为拼接字符串格式输出:col_array_to_join
MongoDB原始数据 | 字段映射配置 | 输出至MaxCompute的结果 |
---|---|---|
| 字段映射配置时,type 为array ,splitter 为必填项。同步任务运行时,会将原始数据数组内容,使用splitter进行拼接,最终输出为拼接后的字符串。 |
|
Document数据存在多层嵌套时,读取嵌套中的指定字段同步
MongoDB原始数据 | 字段映射配置 | 输出至MaxCompute的结果 |
---|---|---|
|
name 为document 中待同步字段的path ,同步任务运行时,会按path 读取document ,并将数据输出。 |
|
Document数据作为json序列化输出
MongoDB原始数据 | 字段映射配置 | 输出至MaxCompute的结果 |
---|---|---|
| 字段映射配置时,type 为string ,同步任务运行时,会将col_doc 整体进行json序列化后输出。 |
|
Document数据除去已配置字段外,其他字段整体进行json序列化输出
MongoDB原始数据 | 字段映射配置 | 输出至MaxCompute的结果 |
---|---|---|
| document共有字段4个,任务已配置非combine类型字段2个(col_1、col_2),同步任务运行时,会将col_1、col_2两个字段外的其他字段按json序列化输出。 |
|
附录2:关于document中Decimal类型输出问题
关于document序列化为JSON格式输出时,Decimal128类型默认情况下,会输出为:
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}
如果需要按数字类型输出,则可以按如下步骤处理:- 在配置离线同步任务时,单击等不转脚本按钮,转换为脚本模式。
- 修改Reader任务配置,在parameter中增加参数
decimal128OutputType
,value固定填写bigDecimal
。 - 重新运行离线同步任务,查看结果。
{ "key_decimal": "9999999.4999999999" }