同步增量数据到MaxCompute

如果需要定期将表格存储(Tablestore)中的新增或变更数据同步至MaxCompute进行备份或业务处理,您可以通过在DataWorks数据集成平台上配置离线同步任务,实现周期性增量同步功能。

前提条件

重要

如果MaxCompute实例和Tablestore实例不在同一地域,请参考以下操作步骤创建VPC对等连接实现跨地域网络连通。

创建VPC对等连接实现跨地域网络连通

此处以DataWorks工作空间与MaxCompute实例位于同一地域(华东1(杭州)),而Tablestore实例位于华东2(上海)为例进行说明。

  1. Tablestore实例绑定VPC。

    1. 登录表格存储控制台,在页面上方选择地域。

    2. 单击实例别名进入实例管理页面。

    3. 切换到网络管理页签,单击绑定VPC,选择VPC和交换机并填写VPC名称,然后单击确定

    4. 请耐心等待一段时间,VPC绑定成功后页面将自动刷新,您可以在VPC列表查看绑定的VPC IDVPC访问地址

      说明

      后续在DataWorks控制台添加Tablestore数据源时,将使用该VPC访问地址。

      image

  2. 获取DataWorks工作空间资源组的VPC信息。

    1. 登录DataWorks控制台,在页面上方选择工作空间所在地域,然后单击左侧工作空间菜单,进入工作空间列表页面。

    2. 单击工作空间名称进入空间详情页面,单击左侧资源组菜单,查看工作空间绑定的资源组列表。

    3. 在目标资源组右侧单击网络设置,在资源调度 & 数据集成区域查看绑定的专有网络,即VPC ID

  3. 创建VPC对等连接并配置路由。

    1. 登录专有网络VPC控制台。在页面左侧单击专有网络菜单,依次选择Tablestore实例和DataWorks工作空间所在地域,并记录VPC ID对应的网段地址。

      image

    2. 在页面左侧单击VPC对等连接菜单,然后在VPC对等连接页面单击创建对等连接

    3. 创建对等连接页面,输入对等连接名称,选择发起端VPC实例、接收端账号类型、接收端地域和接收端VPC实例,单击确定

    4. VPC对等连接页面,找到已创建的VPC对等连接,分别在发起端VPC实例列和接收端VPC实例列单击配置路由条目

      目标网段需填写对端VPC的网段地址。即在发起端VPC实例配置路由条目时,填写接收端VPC实例的网段地址;在接收端VPC实例配置路由条目时,填写发起端VPC实例的网段地址。

操作步骤

步骤一:新增表格存储数据源

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,搜索并选择数据源类型为Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    地域

    选择Tablestore实例所属地域。

    Table Store实例名称

    Tablestore实例的名称。

    Endpoint

    Tablestore实例的服务地址,推荐使用VPC地址

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。

    AccessKey Secret

  6. 测试资源组连通性。创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    1. 连接配置区域,单击相应资源组连通状态列的测试连通性

    2. 测试连通性通过后,连通状态显示可连通,单击完成。您可以在数据源列表中查看新建的数据源。

      说明

      如果测试连通性结果为无法通过,您可使用连通性诊断工具自助解决。如仍无法连通资源组与数据源,请提交工单处理。

步骤二:新增MaxCompute数据源

操作与步骤一相似,在新增数据源对话框中搜索并选择数据源类型为MaxCompute,随后配置相关的数据源参数。

步骤三:配置离线同步任务

数据开发(Data Studio)旧版

一、新建任务节点

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键单击,然后选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写名称,然后单击确认

    数据集成节点下,将显示新建的离线同步节点。

二、配置同步任务

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    1. 网络与资源配置步骤,选择数据来源Tablestore Stream,并选择数据源名称为新增的表格存储数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      说明

      Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

    3. 选择数据去向MaxCompute(ODPS),并选择数据源名称为新增的MaxCompute数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源

      参数

      说明

      数据源

      默认显示上一步选择的Tablestore数据源。

      源数据表。

      开始时间

      增量读取数据的开始时间和结束时间,分别配置为变量形式${startTime}${endTime},具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。

      结束时间

      状态表

      用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。

      最大重试次数

      TableStore中读取增量数据时,每次请求的最大重试次数。

      导出时序信息

      是否导出时序信息,时序信息包含了数据的写入时间等。

      重要

      请勾选导出时序信息的单选框,在后续将增量数据转换为全量数据格式时,需要使用时序信息。

      数据去向

      参数

      说明

      数据源

      默认显示上一步选择的MaxCompute数据源。

      Tunnel资源组

      Tunnel Quota,默认值为公共传输资源,即MC的免费quota。

      MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组

      说明

      如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。

      目标MaxCompute表。

      分区信息

      如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate}。

      写入模式

      数据写入表中的模式,支持以下两种写入模式:

      • 写入前保留已有数据(Insert Into):直接向表或静态分区中插入数据。

        重要

        后续将周期性执行离线同步任务,请选择写入前保留已有数据(Insert Into)。

      • 写入前清理已有数据(Insert Overwrite):先清空表中的原有数据,再向表或静态分区中插入数据。

      空字符串转为Null写入

      如果源头数据为空字符串,在向目标MaxCompute列写入时是否转为Null值写入。默认值为否。

      同步完成才可见

      单击高级配置后才会显示该参数。

      同步到MaxCompute中的数据是否在同步完成后才能被查询到。默认值为否。

    2. 配置字段映射。

      字段映射区域,系统自动进行同名映射,保持默认即可。更多信息,请参见配置字段映射关系

    3. 配置通道控制。

      您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击image.png图标,保存配置。

    脚本模式

    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确定

    2. 在脚本配置页面,编辑脚本。

      脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "otsstream",
            "parameter": {
              "statusTable": "TableStoreStreamReaderStatusTable",
              "maxRetries": 30,
              "isExportSequenceInfo": true,
              "datasource": "MyTablestoreDataSource",
              "dataTable": "target_table",
              "newVersion": "true",
              "column": [
                "pk",
                "colName",
                "version",
                "colValue",
                "opType",
                "sequenceInfo"
              ],
              "startTimeString": "${startTime}",
              "endTimeString": "${endTime}"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "odps",
            "parameter": {
              "partition": "",
              "truncate": false,
              "datasource": "MyMaxComputeDataSource",
              "table": "target_table",
              "column": [
                "pk",
                "colname",
                "version",
                "colvalue",
                "optype",
                "sequenceinfo"
              ],
              "emptyAsNull": false,
              "compress": false
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        },
        "setting": {
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "throttle": false,
            "concurrent": 2
          }
        }
      }
      • Tablestore Stream Reader需要替换的参数说明如下:

        参数名称

        说明

        datasource

        源表的Tablestore数据源名称。

        dataTable

        源表名称。

        column

        源表主键和增量变更信息。

      • MaxCompute Writer需要替换的参数说明如下:

        参数名称

        说明

        datasource

        目标表的MaxCompute数据源名称。

        table

        目标表名称。

        column

        需要写入的属性列。

    3. 单击image.png图标,保存配置。

三、配置调度属性

  1. 单击任务右侧的调度配置

  2. 调度配置面板的调度参数部分,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式

    参数

    参数值

    startTime

    $[yyyymmddhh24-2/24]$[miss-10/24/60]

    endTime

    $[yyyymmddhh24-1/24]$[miss-10/24/60]

    配置示例如下图所示。

    image

    假如任务运行时的时间为2023042319:00:00,startTime20230423175000,endTime20230423185000。任务将会同步在17:5018:50时段内新增的数据。

  3. 时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明

    此处以任务整点每小时自动运行为例介绍配置,如下图所示。

    image

  4. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

    使用工作空间根节点表示该任务无上游的依赖任务。

    image

  5. 配置完成后,关闭配置调度面板。

  6. 单击image.png图标,保存配置。

四、(可选)调试脚本代码

通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到MaxCompute中。

说明

调试脚本代码时配置的时间范围内的数据可能会多次导入到MaxCompute中,相同数据行会覆盖写入到MaxCompute中。

  1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

  2. 参数对话框,选择运行资源组的名称,并配置自定义参数。

    自定义参数的格式为yyyyMMddHHmmss,例如20250631170000。

    image

  3. 单击运行

    任务运行完成后,您可以查看目标表的同步结果。更多信息,请参见MaxCompute表数据

五、发布同步任务

提交同步任务后,同步任务会按照配置的调度属性运行。

  1. 单击image图标。

  2. 提交对话框,根据需要填写变更描述。

  3. 单击确认

数据开发(Data Studio)新版

一、新建任务节点

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入Data Studio

  2. DataStudio控制台的数据开发页面,单击项目目录右侧的image图标,然后选择新建节点 > 数据集成 > 离线同步

    说明

    首次使用项目目录时,也可以直接单击新建节点按钮。

  3. 新建节点对话框,选择路径并填写名称,然后单击确认

    项目目录下,将显示新建的离线同步节点。

二、配置同步任务

  1. 项目目录下,单击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    1. 网络与资源配置步骤,选择数据来源Tablestore Stream,并选择数据源名称为新增的表格存储数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      说明

      Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

    3. 选择数据去向MaxCompute(ODPS),并选择数据源名称为新增的MaxCompute数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源

      参数

      说明

      数据源

      默认显示上一步选择的Tablestore数据源。

      源数据表。

      开始时间

      增量读取数据的开始时间和结束时间,分别配置为变量形式${startTime}${endTime},具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。

      结束时间

      状态表

      用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。

      最大重试次数

      TableStore中读取增量数据时,每次请求的最大重试次数。

      导出时序信息

      是否导出时序信息,时序信息包含了数据的写入时间等。

      重要

      请勾选导出时序信息的单选框,在后续将增量数据转换为全量数据格式时,需要使用时序信息。

      数据去向

      参数

      说明

      数据源

      默认显示上一步选择的MaxCompute数据源。

      Tunnel资源组

      Tunnel Quota,默认值为公共传输资源,即MC的免费quota。

      MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组

      说明

      如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。

      目标MaxCompute表。

      分区信息

      如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate}。

      写入模式

      数据写入表中的模式,支持以下两种写入模式:

      • 写入前保留已有数据(Insert Into):直接向表或静态分区中插入数据。

        重要

        后续将周期性执行离线同步任务,请选择写入前保留已有数据(Insert Into)。

      • 写入前清理已有数据(Insert Overwrite):先清空表中的原有数据,再向表或静态分区中插入数据。

      空字符串转为Null写入

      如果源头数据为空字符串,在向目标MaxCompute列写入时是否转为Null值写入。默认值为否。

      同步完成才可见

      单击高级配置后才会显示该参数。

      同步到MaxCompute中的数据是否在同步完成后才能被查询到。默认值为否。

    2. 配置字段映射。

      字段映射区域,系统自动进行同名映射,保持默认即可。更多信息,请参见配置字段映射关系

    3. 配置通道控制。

      您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击保存,保存配置。

    脚本模式

    1. 配置任务步骤,单击脚本模式,然后在弹出的对话框中单击确定

      image

    2. 在脚本配置页面,编辑脚本。

      脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "otsstream",
            "parameter": {
              "statusTable": "TableStoreStreamReaderStatusTable",
              "maxRetries": 30,
              "isExportSequenceInfo": true,
              "datasource": "MyTablestoreDataSource",
              "dataTable": "target_table",
              "newVersion": "true",
              "column": [
                "pk",
                "colName",
                "version",
                "colValue",
                "opType",
                "sequenceInfo"
              ],
              "startTimeString": "${startTime}",
              "endTimeString": "${endTime}"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "odps",
            "parameter": {
              "partition": "",
              "truncate": false,
              "datasource": "MyMaxComputeDataSource",
              "table": "target_table",
              "column": [
                "pk",
                "colname",
                "version",
                "colvalue",
                "optype",
                "sequenceinfo"
              ],
              "emptyAsNull": false,
              "compress": false
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        },
        "setting": {
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "throttle": false,
            "concurrent": 2
          }
        }
      }
      • Tablestore Stream Reader需要替换的参数说明如下:

        参数名称

        说明

        datasource

        源表的Tablestore数据源名称。

        dataTable

        源表名称。

        column

        源表主键和增量变更信息。

      • MaxCompute Writer需要替换的参数说明如下:

        参数名称

        说明

        datasource

        目标表的MaxCompute数据源名称。

        table

        目标表名称。

        column

        需要写入的属性列。

    3. 单击保存,保存配置。

三、配置调度属性

  1. 单击任务右侧的调度配置

  2. 调度配置面板的调度参数部分,单击添加参数,根据下表说明新增参数。更多信息,请参见调度参数支持格式

    参数

    参数值

    startTime

    $[yyyymmddhh24-2/24]$[miss-10/24/60]

    endTime

    $[yyyymmddhh24-1/24]$[miss-10/24/60]

    配置示例如下图所示。

    image

    假如任务运行时的时间为2023042319:00:00,startTime20230423175000,endTime20230423185000。任务将会同步在17:5018:50时段内新增的数据。

  3. 调度策略部分,配置调度策略。更多信息,请参见实例生成方式:发布后即时生成

    image

  4. 调度时间部分,配置调度时间。更多信息,请参见调度时间

    此处以任务整点每小时自动运行为例,如下图所示。

    image

  5. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

    说明

    使用工作空间根节点表示该任务无上游的依赖任务。

    image

  6. 配置完成后,关闭调度配置面板。

  7. 单击保存,保存配置。

四、(可选)调试脚本代码

通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到MaxCompute中。

说明

调试脚本代码时配置的时间范围内的数据可能会多次导入到MaxCompute中,相同数据行会覆盖写入到MaxCompute中。

  1. 单击任务右侧的调试配置,选择运行的资源组,并配置脚本参数。

    自定义参数的格式为yyyyMMddHHmmss,例如20250528160000。

    image

  2. 配置完成后,关闭调试配置面板。

  3. 单击运行

    任务运行完成后,您可以查看目标表的同步结果。更多信息,请参见MaxCompute表数据

五、发布同步任务

发布同步任务后,该同步任务将根据配置的调度属性运行。

  1. 单击发布

  2. 在同步任务的发布页签,根据需要输入发布描述,然后单击开始发布生产

  3. 根据发布流程引导,单击确认发布

步骤四:查看同步结果

  1. DataWorks控制台查看任务运行状态。

    数据开发(Data Studio)旧版

    1. 单击同步任务工具栏右侧的运维

    2. 周期实例页面的实例视角页签,查看实例的运行详情。更多信息,请参见周期实例视角

    数据开发(Data Studio)新版

    1. 单击前往运维查看周期实例

    2. 周期实例页面的实例视角页签,查看实例的运行详情。更多信息,请参见周期实例视角

  2. 查看目标表的同步结果。

    DataWorks控制台的数据地图模块,您可以查看目标MaxCompute表的详细信息。更多信息,请参见MaxCompute表数据

后续操作

同步到MaxCompute的增量数据为Tablestore数据表的列数据变更记录,您可以根据业务需求将表格存储增量数据转换为全量数据格式

附录:MaxCompute表创建示例

以下示例用于创建目标MaxCompute表,以存储Tablestore数据表的列数据变更记录。

重要

目标MaxCompute表的主键结构(包括名称、数据类型及顺序)必须与Tablestore源表一致。

CREATE TABLE IF NOT EXISTS maxcompute_target_table(
    `pk` STRING COMMENT '主键字段', -- 以实际为准
    `colname` STRING COMMENT '列名',
    `version` BIGINT COMMENT '版本号',
    `colvalue` STRING COMMENT '列值',
    `optype` STRING COMMENT '操作类型',
    `sequenceinfo` STRING COMMENT '时序信息'
)
COMMENT '存储Tablestore的增量数据变更记录';