本文为您介绍如何通过DataWorks的数据集成功能,将JSON数据从OSS迁移至MaxCompute,并使用MaxCompute内置字符串函数GET_JSON_OBJECT提取JSON信息的最佳实践。

前提条件

开始将JSON数据从OSS迁移至MaxCompute的操作前,您需要首先重命名JSON文件为后缀是txt的文件,并上传至OSS。
本文以上传名为applog.txt的JSON文件至OSS为例,OSS Bucket所在区域为华东2(上海)。
{
    "store": {
        "book": [
             {
                "category": "reference",
                "author": "Nigel Rees",
                "title": "Sayings of the Century",
                "price": 8.95
             },
             {
                "category": "fiction",
                "author": "Evelyn Waugh",
                "title": "Sword of Honour",
                "price": 12.99
             },
             {
                 "category": "fiction",
                 "author": "J. R. R. Tolkien",
                 "title": "The Lord of the Rings",
                 "isbn": "0-395-19395-8",
                 "price": 22.99
             }
          ],
          "bicycle": {
              "color": "red",
              "price": 19.95
          }
    },
    "expensive": 10
}

操作步骤

  1. 进入数据源管理页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 单击相应工作空间后的进入数据集成
    4. 在左侧导航栏,单击数据源,进入工作空间管理 > 数据源管理
  2. 新增OSS数据源。
    1. 数据源管理页面,单击新增数据源
    2. 新增数据源对话框中,选择数据源类型为OSS
    3. 新增OSS数据源对话框中,配置各项参数。
      新增OSS数据源
      参数 描述
      数据源名称 数据源名称必须以字母、数字、下划线组合,且不能以数字和下划线开头。
      数据源描述 对数据源进行简单描述,不得超过80个字符。
      适用环境 可以选择开发生产环境。
      说明 仅标准模式工作空间会显示该配置。
      Endpoint OSS Endpoint信息,本示例为http://oss-cn-shanghai.aliyuncs.comhttp://oss-cn-shanghai-internal.aliyuncs.com。OSS各地域的外网、内网地址请参见OSS开通Region和Endpoint对照表
      说明 本文的OSS和DataWorks工作空间处于同一个区域,可以通过内网连接。
      Bucket 相应的OSS Bucket信息,指存储空间,是用于存储对象的容器。

      您可以创建一个或多个存储空间,每个存储空间可添加一个或多个文件。

      您可以在数据同步任务中查找此处填写的存储空间中相应的文件,没有添加的存储空间,则不能查找其中的文件。

      AccessKey ID 访问密钥中的AccessKey ID,您可以进入用户信息管理页面进行复制。
      AceessKey Secret 访问密钥中的AccessKey Secret,相当于登录密码。
    4. 单击测试连通性
    5. 测试连通性通过后,单击完成
  3. 新建表(mqdata),用于存放JSON数据。
    1. 单击当前页面左上角的图标图标,选择全部产品 > DataStudio(数据开发)
    2. 鼠标悬停至新建图标,单击MaxCompute >
      您也可以打开相应的业务流程,右键单击MaxCompute,选择新建 >
    3. 新建表对话框中,输入表名(示例为mqdata)。
      注意 表名不能超过64个字符。
    4. 单击提交
    5. 使用图形界面创建表。
      本示例的mqdata表仅有一列,字段类型为STRING,列名为MQ data图形化界面新建表
    6. 分别单击提交到开发环境提交到生产环境
      说明 如果您使用的是简单模式的工作空间,仅需要单击提交到生产环境
  4. 新建离线同步节点。
    1. DataStudio的左侧导航栏,单击数据开发
    2. 鼠标悬停至新建,单击数据集成 > 离线同步
      您也可以找到相应的业务流程,右键单击数据集成,选择新建 > 离线同步
    3. 新建节点对话框中,输入节点名称,并选择目标文件夹
      注意 节点名称的长度不能超过128个字符。
    4. 单击提交
  5. 配置离线同步节点。
    1. 打开离线同步节点的编辑页面。
    2. 选择数据来源数据去向
      选择数据来源为新建的OSS数据源,Object前缀可以输入文件路径及名称。选择数据去向ODPS > odps_first,选择目标表为新建的表(mqdata)。数据来源
      说明 列分隔符使用TXT文件中不存在的字符即可,本文使用(^)。对于OSS中的TXT格式数据源,Dataworks支持多字符分隔符,您可以使用以下字符串作为列分隔符:

      %&%#^$$^%

    3. 配置字段的映射关系,选择默认的同行映射即可。
    4. 单击工具栏中的转换脚本
      修改fileFormat参数为"fileFormat":"binary",代码示例如下。
      {
          "type": "job",
          "steps": [
              {
                  "stepType": "oss",
                  "parameter": {
                      "fieldDelimiterOrigin": "^",
                      "nullFormat": "",
                      "compress": "",
                      "datasource": "OSS_userlog",
                      "column": [
                          {
                              "name": 0,
                              "type": "string",
                              "index": 0
                          }
                      ],
                      "skipHeader": "false",
                      "encoding": "UTF-8",
                      "fieldDelimiter": "^",
                      "fileFormat": "binary",
                      "object": [
                          "applog.txt"
                      ]
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                      "partition": "",
                      "isCompress": false,
                      "truncate": true,
                      "datasource": "odps_first",
                      "column": [
                          "mqdata"
                      ],
                      "emptyAsNull": false,
                      "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,
                  "throttle": false,
              }
          }
      }
      说明 该步骤可以保证OSS中的JSON文件同步至MaxCompute之后存在同一行数据中,即为一个字段,其它参数保持不变。
    5. 完成上述配置后,单击运行。运行成功日志示例如下所示。
      运行日志
  6. 验证JSON数据从OSS迁移至MaxCompute的结果。
    1. 可以打开相应的业务流程,右键单击MaxCompute,选择新建 > ODPS SQL
      注意 您在工作空间配置页面添加MaxCompute计算引擎实例后,当前页面才会显示MaxCompute目录。
    2. 新建节点对话框中,输入节点名称,并选择目标文件夹
      注意 节点名称的长度不能超过128个字符。
    3. 单击提交
    4. 在节点的编辑页面输入SELECT * from mqdata;,单击运行图标,查看当前表(mqdata)中的数据。
      您也可以直接在MaxCompute客户端中输入命令运行。
    5. 确认导入表中的数据结果无误后,执行SELECT GET_JSON_OBJECT(mqdata.MQdata,'$.expensive') FROM mqdata;获取JSON文件中的expensive值。
      在验证迁移后的结果时,您可以使用MaxCompute内建字符串函数GET_JSON_OBJECT获取您需要的JSON数据。