同步MaxCompute数据

通过DataWorks大数据开发治理平台的数据集成,您可以将MaxCompute的数据全量导出到表格存储(Tablestore)

背景信息

大数据开发治理平台(DataWorks)基于MaxCompute、Hologres、EMR、AnalyticDB、CDP等大数据引擎,为数据仓库、数据湖、湖仓一体等解决方案提供统一的全链路大数据开发治理平台。

数据集成是一个稳定高效、弹性伸缩的数据同步平台,致力于提供在复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动及同步能力。

通过DataWorksMaxCompute计算分析后的数据同步至表格存储后,您可以充分利用Tablestore高并发吞吐与复杂查询能力,以满足实时数据分析与多样化检索需求。

image

前提条件

重要

如果MaxCompute实例和Tablestore实例不在同一地域,请参考以下操作步骤创建VPC对等连接实现跨地域网络连通。

创建VPC对等连接实现跨地域网络连通

此处以DataWorks工作空间与MaxCompute实例位于同一地域(华东1(杭州)),而Tablestore实例位于华东2(上海)为例进行说明。

  1. Tablestore实例绑定VPC。

    1. 登录表格存储控制台,在页面上方选择地域。

    2. 单击实例别名进入实例管理页面。

    3. 切换到网络管理页签,单击绑定VPC,选择VPC和交换机并填写VPC名称,然后单击确定

    4. 请耐心等待一段时间,VPC绑定成功后页面将自动刷新,您可以在VPC列表查看绑定的VPC IDVPC访问地址

      说明

      后续在DataWorks控制台添加Tablestore数据源时,将使用该VPC访问地址。

      image

  2. 获取DataWorks工作空间资源组的VPC信息。

    1. 登录DataWorks控制台,在页面上方选择工作空间所在地域,然后单击左侧工作空间菜单,进入工作空间列表页面。

    2. 单击工作空间名称进入空间详情页面,单击左侧资源组菜单,查看工作空间绑定的资源组列表。

    3. 在目标资源组右侧单击网络设置,在资源调度 & 数据集成区域查看绑定的专有网络,即VPC ID

  3. 创建VPC对等连接并配置路由。

    1. 登录专有网络VPC控制台。在页面左侧单击专有网络菜单,依次选择Tablestore实例和DataWorks工作空间所在地域,并记录VPC ID对应的网段地址。

      image

    2. 在页面左侧单击VPC对等连接菜单,然后在VPC对等连接页面单击创建对等连接

    3. 创建对等连接页面,输入对等连接名称,选择发起端VPC实例、接收端账号类型、接收端地域和接收端VPC实例,单击确定

    4. VPC对等连接页面,找到已创建的VPC对等连接,分别在发起端VPC实例列和接收端VPC实例列单击配置路由条目

      目标网段需填写对端VPC的网段地址。即在发起端VPC实例配置路由条目时,填写接收端VPC实例的网段地址;在接收端VPC实例配置路由条目时,填写发起端VPC实例的网段地址。

操作步骤

步骤一:新增表格存储数据源

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,搜索并选择数据源类型为Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    地域

    选择Tablestore实例所属地域。

    Table Store实例名称

    Tablestore实例的名称。

    Endpoint

    Tablestore实例的服务地址,推荐使用VPC地址

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。

    AccessKey Secret

  6. 测试资源组连通性。创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    1. 连接配置区域,单击相应资源组连通状态列的测试连通性

    2. 测试连通性通过后,连通状态显示可连通,单击完成。您可以在数据源列表中查看新建的数据源。

      说明

      如果测试连通性结果为无法通过,您可使用连通性诊断工具自助解决。如仍无法连通资源组与数据源,请提交工单处理。

步骤二:新增MaxCompute数据源

操作与步骤一相似,在新增数据源对话框中搜索并选择数据源类型为MaxCompute,随后配置相关的数据源参数。

步骤三:配置离线同步任务

数据开发(Data Studio)旧版

一、新建任务节点

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键单击,然后选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写名称,然后单击确认

    数据集成节点下,将显示新建的离线同步节点。

二、配置同步任务

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    1. 网络与资源配置步骤,选择数据来源MaxCompute(ODPS),并选择数据源名称为新增的MaxCompute数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      说明

      Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

    3. 选择数据去向Tablestore,并选择数据源名称为新增的表格存储数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源

      参数

      说明

      数据源

      默认显示上一步选择的MaxCompute数据源。

      Tunnel资源组

      Tunnel Quota,默认选择“公共传输资源”,即MC的免费quota。

      MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组

      说明

      如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。

      源表。

      过滤方式

      同步数据的过滤逻辑,支持以下两种方式 :

      • 分区过滤:通过分区表达式过滤指定来源数据的同步范围,选择此方式时还需配置分区信息分区不存在时参数。

      • 数据过滤:通过SQLWHERE过滤语句指定来源数据的同步范围(不需要填写WHERE关键字)。

      分区信息

      说明

      过滤方式选择为分区过滤时需要配置。

      指定分区列的取值。

      • 取值可以是固定值,如ds=20220101

      • 取值可以是调度系统参数,如ds=${bizdate},当任务运行时,会自动替换调度系统参数。

      分区不存在

      说明

      过滤方式选择为分区过滤时需要配置。

      分区不存在时,同步任务的处理策略。

      • 出错

      • 忽略不存在分区,任务正常执行

      数据去向

      参数

      说明

      数据源

      默认显示上一步选择的Tablestore数据源。

      目标数据表。

      主键信息

      目标数据表的主键信息。

      写入模式

      数据写入表格存储的模式,支持以下两种模式:

      • PutRow:对应于Tablestore API PutRow,插入数据到指定的行。如果该行不存在,则新增一行。如果该行存在,则覆盖原有行。

      • UpdateRow:对应于Tablestore API UpdateRow,更新指定行的数据。如果该行不存在,则新增一行。如果该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。

    2. 配置字段映射。

      配置数据来源与数据去向后,需要指定来源字段目标字段的映射关系。配置字段映射关系后,任务将根据字段映射关系,将源表字段写入目标表对应类型的字段中。更多信息,请参见配置字段映射关系

      重要
      • 来源字段中必须指定主键信息,以便读取主键数据。

      • 由于在上一步的数据去向下已配置了目标表的主键信息,因此在目标字段中不可再配置主键信息。

      • 当字段的数据类型为INTEGER时,您需要将其配置为INT,DataWorks会自动将其转换为INTEGER类型。如果直接配置为INTEGER类型,日志将会出现错误,导致任务无法顺利完成。

    3. 配置通道控制。

      您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击image.png图标,保存配置。

    脚本模式

    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确定image

    2. 在脚本配置页面,编辑脚本。

      脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "odps",
            "parameter": {
              "partition": [],
              "datasource": "MyMaxComputeDataSource",
              "table": "source_table",
              "column": [
                "pk",
                "col_string",
                "col_long",
                "col_double",
                "col_bool"
              ]
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "ots",
            "parameter": {
              "datasource": "MyTablestoreDataSource",
              "table": "target_table",
              "newVersion": "true",
              "mode": "normal",
              "writeMode": "updateRow",
              "column": [
                {
                  "name": "col_string",
                  "type": "STRING"
                },
                {
                  "name": "col_long",
                  "type": "INT"
                },
                {
                  "name": "col_double",
                  "type": "DOUBLE"
                },
                {
                  "name": "col_bool",
                  "type": "BOOL"
                }
              ],
              "primaryKey": [
                {
                  "name": "pk",
                  "type": "STRING"
                }
              ]
            },
            "name": "Writer",
            "category": "writer"
          },
          {
            "name": "Processor",
            "stepType": null,
            "category": "processor",
            "parameter": {}
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "throttle": true,
            "concurrent": 2,
            "mbps": "12"
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
      重要
      • MaxCompute Readercolumn中必须指定主键信息,以便读取主键数据。

      • 当字段的数据类型为INTEGER时,您需要将其配置为INT,DataWorks会自动将其转换为INTEGER类型。如果直接配置为INTEGER类型,日志将会出现错误,导致任务无法顺利完成。

      • MaxCompute Reader需要替换的参数说明如下:

        参数名称

        说明

        partition

        读取数据所在的分区。

        说明

        如果表为分区表,则必填;如果表为非分区表,则不可填写。

        datasource

        MaxCompute数据源名称。

        table

        源表名称。

        column

        读取MaxCompute源表的列信息。

      • Tablestore Write需要替换的参数说明如下:

        参数名称

        说明

        datasource

        Tablestore数据源名称。

        table

        目标表名称。

        primaryKey

        目标表的主键列。

        column

        需要写入的属性列。

        重要

        写入目标表的列顺序必须与读取源表的列顺序保持一致。

    3. 单击image.png图标,保存配置。

三、运行同步任务

  1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

  2. 参数对话框,选择运行资源组的名称。

  3. 单击运行

数据开发(Data Studio)新版

一、新建任务节点

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入Data Studio

  2. DataStudio控制台的数据开发页面,单击项目目录右侧的image图标,然后选择新建节点 > 数据集成 > 离线同步

    说明

    首次使用项目目录时,也可以直接单击新建节点按钮。

  3. 新建节点对话框,选择路径并填写名称,然后单击确认

    项目目录下,将显示新建的离线同步节点。

二、配置同步任务

  1. 项目目录下,单击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    1. 网络与资源配置步骤,选择数据来源MaxCompute(ODPS),并选择数据源名称为新增的MaxCompute数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      说明

      Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

    3. 选择数据去向Tablestore,并选择数据源名称为新增的表格存储数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源

      参数

      说明

      数据源

      默认显示上一步选择的MaxCompute数据源。

      Tunnel资源组

      Tunnel Quota,默认选择“公共传输资源”,即MC的免费quota。

      MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组

      说明

      如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。

      源表。

      过滤方式

      同步数据的过滤逻辑,支持以下两种方式 :

      • 分区过滤:通过分区表达式过滤指定来源数据的同步范围,选择此方式时还需配置分区信息分区不存在时参数。

      • 数据过滤:通过SQLWHERE过滤语句指定来源数据的同步范围(不需要填写WHERE关键字)。

      分区信息

      说明

      过滤方式选择为分区过滤时需要配置。

      指定分区列的取值。

      • 取值可以是固定值,如ds=20220101

      • 取值可以是调度系统参数,如ds=${bizdate},当任务运行时,会自动替换调度系统参数。

      分区不存在

      说明

      过滤方式选择为分区过滤时需要配置。

      分区不存在时,同步任务的处理策略。

      • 出错

      • 忽略不存在分区,任务正常执行

      数据去向

      参数

      说明

      数据源

      默认显示上一步选择的Tablestore数据源。

      目标数据表。

      主键信息

      目标数据表的主键信息。

      写入模式

      数据写入表格存储的模式,支持以下两种模式:

      • PutRow:对应于Tablestore API PutRow,插入数据到指定的行。如果该行不存在,则新增一行。如果该行存在,则覆盖原有行。

      • UpdateRow:对应于Tablestore API UpdateRow,更新指定行的数据。如果该行不存在,则新增一行。如果该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。

    2. 配置字段映射。

      配置数据来源与数据去向后,需要指定来源字段目标字段的映射关系。配置字段映射关系后,任务将根据字段映射关系,将源表字段写入目标表对应类型的字段中。更多信息,请参见配置字段映射关系

      重要
      • 来源字段中必须指定主键信息,以便读取主键数据。

      • 由于在上一步的数据去向下已配置了目标表的主键信息,因此在目标字段中不可再配置主键信息。

      • 当字段的数据类型为INTEGER时,您需要将其配置为INT,DataWorks会自动将其转换为INTEGER类型。如果直接配置为INTEGER类型,日志将会出现错误,导致任务无法顺利完成。

    3. 配置通道控制。

      您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击保存,保存配置。

    脚本模式

    1. 配置任务步骤,单击脚本模式,然后在弹出的对话框中单击确定image

    2. 在脚本配置页面,编辑脚本。

      脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "odps",
            "parameter": {
              "partition": [],
              "datasource": "MyMaxComputeDataSource",
              "table": "source_table",
              "column": [
                "pk",
                "col_string",
                "col_long",
                "col_double",
                "col_bool"
              ]
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "ots",
            "parameter": {
              "datasource": "MyTablestoreDataSource",
              "table": "target_table",
              "newVersion": "true",
              "mode": "normal",
              "writeMode": "updateRow",
              "column": [
                {
                  "name": "col_string",
                  "type": "STRING"
                },
                {
                  "name": "col_long",
                  "type": "INT"
                },
                {
                  "name": "col_double",
                  "type": "DOUBLE"
                },
                {
                  "name": "col_bool",
                  "type": "BOOL"
                }
              ],
              "primaryKey": [
                {
                  "name": "pk",
                  "type": "STRING"
                }
              ]
            },
            "name": "Writer",
            "category": "writer"
          },
          {
            "name": "Processor",
            "stepType": null,
            "category": "processor",
            "parameter": {}
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "throttle": true,
            "concurrent": 2,
            "mbps": "12"
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
      重要
      • MaxCompute Readercolumn中必须指定主键信息,以便读取主键数据。

      • 当字段的数据类型为INTEGER时,您需要将其配置为INT,DataWorks会自动将其转换为INTEGER类型。如果直接配置为INTEGER类型,日志将会出现错误,导致任务无法顺利完成。

      • MaxCompute Reader需要替换的参数说明如下:

        参数名称

        说明

        partition

        读取数据所在的分区。

        说明

        如果表为分区表,则必填;如果表为非分区表,则不可填写。

        datasource

        MaxCompute数据源名称。

        table

        源表名称。

        column

        读取MaxCompute源表的列信息。

      • Tablestore Write需要替换的参数说明如下:

        参数名称

        说明

        datasource

        Tablestore数据源名称。

        table

        目标表名称。

        primaryKey

        目标表的主键列。

        column

        需要写入的属性列。

        重要

        写入目标表的列顺序必须与读取源表的列顺序保持一致。

    3. 单击保存,保存配置。

三、运行同步任务

  1. 单击任务右侧的调试配置,选择运行的资源组。

  2. 单击运行

步骤四:查看同步结果

运行同步任务后,您可以通过日志查看任务的执行状态,并在表格存储控制台查看目标数据表的同步结果。

  1. 查看任务执行状态。

    1. 在同步任务的结果页签,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。

    2. 如需查看更详细的运行日志,您可以单击Detail log url对应的链接。

  2. 查看目标数据表的同步结果。

    1. 进入实例管理页面。

      1. 登录表格存储控制台

      2. 在页面上方,选择资源组和地域。

      3. 概览页面,单击实例别名或在实例操作列单击实例管理

    2. 实例详情页签,单击数据表列表页签。

    3. 数据表列表页签,单击目标数据表名。

    4. 数据管理页签,即可查看同步到该数据表中的数据。