将MaxCompute数据同步到表格存储

如果需要将MaxCompute计算分析后的数据同步到表格存储中存储或者使用,您可以通过在DataWorks数据集成控制台新建和配置离线同步任务来实现全量数据导出。全量数据导出到表格存储后,您可以使用表格存储查询与分析数据。

背景信息

表格存储(Tablestore)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控、推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。更多信息,请参见什么是表格存储

云原生大数据计算服务(MaxCompute)是一种快速、完全托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。更多信息,请参见什么是MaxCompute

DataWorks基于MaxCompute、Hologres、EMR、AnalyticDB、CDP等大数据引擎,为数据仓库、数据湖、湖仓一体等解决方案提供统一的全链路大数据开发治理平台。DataWorks数据集成是稳定高效、弹性伸缩的数据同步平台,致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动及同步能力。更多信息,请参见DataWorks数据集成

您可以通过DataWorks数据集成功能将MaxCompute计算分析后的数据同步到表格存储中,提升应用的读写和搜索效率。

tablestore

前提条件

  • 已确认和记录MaxCompute中要同步到表格存储的表信息。

  • 已创建RAM用户并为RAM用户授予管理表格存储权限(AliyunOTSFullAccess)和管理DataWorks权限(AliyunDataWorksFullAccess)。具体操作,请参见创建RAM用户RAM用户授权

  • 已为RAM用户创建AccessKey。具体操作,请参见创建AccessKey

  • 已开通表格存储服务,创建表格存储实例以及创建数据表。具体操作,请参见开通表格存储服务创建实例创建数据表

步骤一:新增表格存储数据源

将表格存储添加为数据源,具体步骤如下:

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,找到Tablestore区块,单击Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    Endpoint

    Tablestore实例的服务地址。更多信息,请参见服务地址

    如果Tablestore实例和目标数据源的资源在同一个地域,填写VPC地址;如果Tablestore实例和目标数据源的资源不在同一个地域,填写公网地址。

    Table Store实例名称

    Tablestore实例的名称。更多信息,请参见实例

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

    AccessKey Secret

  6. 测试资源组连通性。

    创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    重要

    数据同步时,一个任务只能使用一种资源组。资源组列表默认显示仅数据集成公共资源组。为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。

    1. 单击前往购买进行全新创建并绑定资源组到工作空间或单击绑定已购资源组为工作空间绑定已有资源组。具体操作,请参见新增和使用独享数据集成资源组

    2. 待资源组启动成功后,单击相应资源组连通状态(生产环境)列的测试连通性

      当连通状态显示为可连通时,表示连通成功。

  7. 测试连通性通过后,单击完成

    在数据源列表中,可以查看新建的数据源。

步骤二:新增MaxCompute数据源

当要使用当前工作空间默认生成的odps_first数据源时,请跳过此步骤。

具体操作与步骤一类似,只需在新增数据源对话框,找到MaxCompute区块,单击MaxCompute

重要
  • 数据同步时,一个任务只能使用一种资源组。资源组列表默认仅显示独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。

  • 如果未创建资源组,请单击新建独享数据集成资源组进行创建。具体操作,请参见新增和使用独享数据集成资源组

步骤三:新建同步任务节点

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤四:配置离线同步任务并启动

新建并配置MaxCompute到表格存储的同步任务,具体步骤如下:

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络链接。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    1. 网络与资源配置步骤,选择数据来源MaxCompute(ODPS),并选择数据源名称MaxCompute数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向Tablestore,并选择数据源名称为表格存储数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

    5. 提示对话框,单击确认使用脚本模式

      重要
      • 表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。

      • 任务转为脚本模式后,将无法转为向导模式。

  3. 配置任务并保存。

    全量数据的同步需要使用到MaxCompute ReaderTablestore(OTS) Writer插件。脚本配置规则请参见MaxCompute数据源Tablestore数据源

    1. 在脚本配置页面,请根据如下示例完成配置。

      重要
      • 由于全量导出一般是一次性的,所以无需配置自动调度参数。

      • 为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "odps", #插件名称,不能修改。
                  "parameter": {
                      "partition": [], #如果表为分区表,则必填。如果表为非分区表,则不能填写。需要写入数据表的分区信息,必须指定到最后一级分区。
                      "datasource": "", #MaxCompute数据源名称,请根据实际填写。
                      "column": [ #需要导出的MaxCompute列名。
                          "*"
                      ],
                      "table": "" #MaxCompute中的表名。
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "ots", #插件名称,不能修改。
                  "parameter": {
                      "datasource": "", #表格存储数据源名称,请根据实际填写。
                      "column": [ #作为数据表属性列的列。
                          {
                              "name": "columnName1",
                              "type": "INT"
                          },
                          {
                              "name": "columnName2",
                              "type": "STRING"
                          },
                          {
                              "name": "columnName3",
                              "type": "DOUBLE"
                          },
                          {
                              "name": "columnName4",
                              "type": "BOOLEAN"
                          },
                          {
                              "name": "columnName5",
                              "type": "BINARY"
                          }
                      ],
                      "writeMode": "UpdateRow",  #数据写入表格存储的模式。取值范围为PutRowUpdateRow。选择PutRow时表示覆盖写入数据,如果该行不存在,则新增一行;如果该行存在,则覆盖原有行。选择UpdateRow时表示更新写入数据,如果该行不存在,则新增一行;如果该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。
                      "table": "", #Tablestore中的数据表名称。
                      "primaryKey": [ #作为数据表主键的列。
                          {
                              "name": "pk1",
                              "type": "STRING"
                          },
                          {
                              "name": "pk2",
                              "type": "INT"
                          }
                      ]
                  },
                  "name": "Writer",
                  "category": "writer"
              },
              {
                  "name": "Processor",
                  "stepType": null,
                  "category": "processor",
                  "parameter": {}
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": "0" #当错误个数超过record个数时,导入任务会失败。
              },
              "speed": {
                  "throttle":true, #当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1 #作业并发数。
                  "mbps":"12" #限流。
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }

    2. 单击image.png图标,保存脚本。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  4. 执行同步任务。

    说明

    全量数据一般只需要同步一次,无需配置调度属性。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称。

    3. 单击运行

      运行结束后,在同步任务的运行日志页签,单击Detail log url对应的链接后。在任务的详细运行日志页面,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。

步骤五:查看导入到表格存储中的数据

在表格存储控制台查看表格存储数据同步结果。

  1. 登录表格存储控制台

  2. 概览页面上方,选择地域。

  3. 单击实例名称。

  4. 实例详情页签下的数据表列表页签,单击目标数据表名称。

  5. 数据管理页签,即可查看同步到该数据表中的数据。