本文为您介绍如何通过DataWorks的数据集成功能,将从MongoDB提取的JSON字段迁移至MaxCompute。
前提条件
新增MaxCompute数据源。详情请参见创建MaxCompute数据源。
在DataWorks上完成创建业务流程,本例使用DataWorks简单模式。详情请参见创建业务流程。
在MongoDB上准备测试数据
账号准备。
在数据库内新建用户,用于DataWorks添加数据源。本示例执行如下命令。
db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})
新建用户名为bookuser,密码为123456,角色为任意拥有访问数据权限的角色。
数据准备。
将数据上传至MongoDB数据库。本示例使用阿里云的云数据库MongoDB版,网络类型为VPC(需申请公网地址,否则无法与DataWorks公共资源组互通),测试数据如下。
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }
在MongoDB的DMS控制台,本示例使用的数据库为admin,集合为userlog。执行如下命令,查看已上传的数据。
db.userlog.find().limit(10)
通过DataWorks将JSON数据从MongoDB迁移至MaxCompute
登录DataWorks控制台。
在DataWorks上创建目标表。用以接收从MongoDB迁移的数据。
右键单击已创建的业务流程,选择 。
在新建表页面,选择引擎类型并输入表名。
在表的编辑页面,单击DDL。
在DDL模式对话框,输入建表语句,单击生成表结构。
重要建表语句中的表名称请与在新建表输入的表名一致。
create table mqdata (mqdata string);
单击提交到生产环境。
新增MongoDB数据源,详情请参见配置MongoDB数据源。
创建离线同步节点。
进入数据开发页面,右键单击指定业务流程,选择 。
在新建节点对话框中,输入节点名称,并单击确认。
在顶部菜单栏上,单击图标。
在脚本模式下,单击顶部菜单栏上的图标。
在导入模板对话框中选择来源类型、数据源、目标类型及数据源,并单击确定。
输入如下脚本。
{ "type": "job", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "mongodb_userlog",//数据源名称。 "column": [ { "name": "store.bicycle.color", //JSON字段路径,本例中提取color值。 "type": "document.String" //非一层子属性以最终获取的类型为准。假如您选取的JSON字段为一级字段,例如本例中的expensive,则直接填写string即可。 } ], "collectionName": "userlog" //集合名称。 }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "isCompress": false, "truncate": true, "datasource": "odps_source",// MaxCompute数据源名称 "column": [ "mqdata" //MaxCompute表列名。 ], "emptyAsNull": false, "table": "mqdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false, } } }
单击图标运行代码。
您可以在运行日志查看运行结果。
验证结果
右键单击业务流程,选择 。
在新建节点对话框中输入节点名称,并单击确认。
在ODPS SQL节点编辑页面输入如下语句。
SELECT * from mqdata;
单击图标运行代码。
您可以在运行日志查看运行结果。