同步MaxCompute数据

更新时间:2025-02-28 02:15:01

如果需要将MaxCompute计算分析后的数据同步到表格存储中存储或者使用,您可以通过在DataWorks数据集成控制台新建和配置离线同步任务来实现全量数据导出。全量数据导出到表格存储后,您可以使用表格存储查询与分析数据。

背景信息

您可以通过DataWorks数据集成功能将MaxCompute计算分析后的数据同步到表格存储中,提升应用的读写和搜索效率。

tablestore

前提条件

  • 已确认和记录MaxCompute中要同步到表格存储的表信息。

  • 已创建RAM用户并为RAM用户授予管理表格存储权限(AliyunOTSFullAccess)和管理DataWorks权限(AliyunDataWorksFullAccess)。具体操作,请参见创建RAM用户RAM用户授权

  • 已为RAM用户创建AccessKey。具体操作,请参见创建AccessKey

  • 已开通表格存储服务,创建表格存储实例以及创建数据表。具体操作,请参见开通服务并创建实例创建数据表

步骤一:新增表格存储数据源

将表格存储添加为数据源,具体步骤如下:

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,找到Tablestore区块,单击Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    地域

    选择Tablestore实例所属地域。

    Table Store实例名称

    Tablestore实例的名称。更多信息,请参见实例

    Endpoint

    Tablestore实例的服务地址,推荐使用VPC地址

    重要

    本文以Tablestore实例和DataWorks工作空间在同一阿里云主账号的同一地域下为例进行说明。更多场景信息,请参见各场景网络连通配置示例

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

    AccessKey Secret

  6. 测试资源组连通性。

    创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    1. (可选)购买并绑定资源组至当前DataWorks工作空间。具体操作,请参见新增和使用Serverless资源组

      不推荐使用旧版资源组(独享资源组和公共资源组),相较于旧版资源组,Serverless资源组支持的能力更丰富、售卖方式更统一、能有效利用资源碎片避免浪费,因此推荐您使用Serverless资源组。

      说明

      Serverless资源组默认不具备公网访问能力。需要为绑定的VPC配置公网NAT网关EIP后,才支持公网访问数据源。

    2. 待资源组启动成功后,在连接配置区域,单击相应资源组连通状态列的测试连通性

    3. 测试连通性通过后,连通状态显示可连通,单击完成

      在数据源列表中,可以查看新建的数据源。

      说明

      如果显示无法连通,表示资源组与数据源无法连通,后续相应数据源任务将无法正常执行。您可以参考以下思路排查处理。

      • 根据右侧弹出的连通性诊断工具窗口,自助解决连通性问题。

      • 如果连通性诊断工具未给出具体解决办法,请检查您设置的账号、密码、连接地址等参数,以及确保将资源组的IP地址加入到数据源的白名单中。更多信息,请参见网络连通方案

步骤二:新增MaxCompute数据源

当要使用当前工作空间默认生成的odps_first数据源时,请跳过此步骤。

具体操作与步骤一类似,只需在新增数据源对话框,找到MaxCompute区块,单击MaxCompute

重要
  • 数据同步时,一个任务只能使用一种资源组。资源组列表默认仅显示独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。

  • 如果未创建资源组,请单击新建独享数据集成资源组进行创建。具体操作,请参见新增和使用独享数据集成资源组

步骤三:新建同步任务节点

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤四:配置离线同步任务并启动

新建并配置MaxCompute到表格存储的同步任务,具体步骤如下:

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络链接。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    1. 网络与资源配置步骤,选择数据来源MaxCompute(ODPS),并选择数据源名称MaxCompute数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向Tablestore,并选择数据源名称为表格存储数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

    5. 提示对话框,单击确认使用脚本模式

      重要
      • 表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。

      • 任务转为脚本模式后,将无法转为向导模式。

  3. 配置任务并保存。

    全量数据的同步需要使用到MaxCompute ReaderTablestore(OTS) Writer插件。脚本配置规则请参见MaxCompute数据源Tablestore数据源

    1. 在脚本配置页面,请根据如下示例完成配置。

      重要
      • 由于全量导出一般是一次性的,所以无需配置自动调度参数。

      • 为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "odps", #插件名称,不能修改。
                  "parameter": {
                      "partition": [], #如果表为分区表,则必填。如果表为非分区表,则不能填写。需要写入数据表的分区信息,必须指定到最后一级分区。
                      "datasource": "", #MaxCompute数据源名称,请根据实际填写。
                      "column": [ #需要导出的MaxCompute列名。
                          "*"
                      ],
                      "table": "" #MaxCompute中的表名。
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "ots", #插件名称,不能修改。
                  "parameter": {
                      "datasource": "", #表格存储数据源名称,请根据实际填写。
                      "column": [ #作为数据表属性列的列。
                          {
                              "name": "columnName1",
                              "type": "INT"
                          },
                          {
                              "name": "columnName2",
                              "type": "STRING"
                          },
                          {
                              "name": "columnName3",
                              "type": "DOUBLE"
                          },
                          {
                              "name": "columnName4",
                              "type": "BOOLEAN"
                          },
                          {
                              "name": "columnName5",
                              "type": "BINARY"
                          }
                      ],
                      "writeMode": "UpdateRow",  #数据写入表格存储的模式。取值范围为PutRow和UpdateRow。选择PutRow时表示覆盖写入数据,如果该行不存在,则新增一行;如果该行存在,则覆盖原有行。选择UpdateRow时表示更新写入数据,如果该行不存在,则新增一行;如果该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。
                      "table": "", #Tablestore中的数据表名称。
                      "primaryKey": [ #作为数据表主键的列。
                          {
                              "name": "pk1",
                              "type": "STRING"
                          },
                          {
                              "name": "pk2",
                              "type": "INT"
                          }
                      ]
                  },
                  "name": "Writer",
                  "category": "writer"
              },
              {
                  "name": "Processor",
                  "stepType": null,
                  "category": "processor",
                  "parameter": {}
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": "0" #当错误个数超过record个数时,导入任务会失败。
              },
              "speed": {
                  "throttle":true, #当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true,表示限流。
                  "concurrent":1 #作业并发数。
                  "mbps":"12" #限流。
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }

    2. 单击image.png图标,保存脚本。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  4. 执行同步任务。

    说明

    全量数据一般只需要同步一次,无需配置调度属性。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称。

    3. 单击运行

      运行结束后,在同步任务的运行日志页签,单击Detail log url对应的链接后。在任务的详细运行日志页面,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。

步骤五:查看导入到表格存储中的数据

在表格存储控制台查看表格存储数据同步结果。

  1. 登录表格存储控制台

  2. 概览页面上方,选择地域。

  3. 单击实例名称。

  4. 实例详情页签下的数据表列表页签,单击目标数据表名称。

  5. 数据管理页签,即可查看同步到该数据表中的数据。

  • 本页导读 (1)
  • 背景信息
  • 前提条件
  • 步骤一:新增表格存储数据源
  • 步骤二:新增MaxCompute数据源
  • 步骤三:新建同步任务节点
  • 步骤四:配置离线同步任务并启动
  • 步骤五:查看导入到表格存储中的数据