时序表同步到时序表

更新时间:2025-02-28 02:02:04

本文介绍如何使用DataWorks将表格存储中时序表的全量数据或者增量数据同步到另一个时序表。

背景信息

表格存储支持对时序数据进行存储与查询,在某些场景下,您可能希望将时序表中的时序数据迁移到另一个时序表中。

具体数据迁移场景如下:

  • 全量数据同步:将时序表中现有的全部时序数据迁移到另一个时序表中。

  • 增量数据同步:定时(例如每隔一天)将时序表中新增的数据迁移到另一个时序表中。

数据集成是一个稳定高效、弹性伸缩的数据同步平台,致力于提供在复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动及同步能力。您可以在数据开发(DataStudio)界面直接创建离线同步节点用于离线(批量)数据周期性同步或者创建实时同步节点用于单表或整库增量数据实时同步。更多信息,请参见数据集成

前提条件

  • 已获取源时序表和目标时序表的实例名称、实例访问地址、地域ID等信息。

  • 已创建目标时序表。具体操作,请参见创建时序表

  • 已开通DataWorks服务并创建工作空间。具体操作,请参见开通DataWorks服务创建工作空间

  • 已获取 AccessKey 信息。请使用阿里云账号或RAM用户的 AccessKey 进行配置。获取AccessKey的具体操作,请参见如何获取AccessKey

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限( AliyunOTSFullAccess )并为该RAM用户创建AccessKey。具体操作,请参见使用RAM用户访问密钥访问表格存储

全量数据同步

步骤一:新增数据源

具体操作,请参见新增表格存储数据源

  • 如果源时序表和目标时序表所在实例相同,则只需要创建一个数据源即可。

  • 如果源时序表和目标时序表所在实例不同,则需要分别创建源数据源和目标数据源。

步骤二:新建离线同步任务

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤三:配置离线同步任务并启动

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络连接。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    1. 网络与资源配置步骤,选择数据来源Tablestore,并选择数据源名称为源数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向Tablestore,并选择数据源名称为目标数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    全量数据的同步需要使用到Tablestore(OTS) ReaderTablestore Writer插件。脚本配置规则请参见Reader脚本Demo与参数说明Writer脚本Demo与参数说明

    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确认以进入脚本模式。

      image

    2. 根据配置示例编辑脚本。

      配置示例如下:

      重要

      为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "ots", //读数据使用的插件为Tablestore Reader。
            "parameter": {
              "datasource": "instance01", //已增加的Tablestore源数据源。
              "table": "timeseriesReadTable", //源时序表名称。
              "newVersion": "true", //使用Tablestore Reader的新版本。
              "mode": "normal", //读时序数据mode必须为normal。
              "isTimeseriesTable": "true", //表示该表为时序表。
              "envType": 1,
              "column": [
                {
                  "name": "_m_name" //导出时序表的度量名称字段。
                },
                {
                  "name": "_data_source" //导出时序表的数据源字段。
                },
                {
                  "name": "_tags" //导出时序表的标签字段。
                },
                {
                  "name": "_time" //导出时序表的时间戳字段。
                },
                //以下配置请根据时序表中的具体数据为准。 本示例中从源时序表中导出列名分别为string_1(string类型),bool_1(bool类型),int_1(int整型),double_1(double类型)的四列。
                {
                  "name": "string_1",
                  "type": "string"
                },
                {
                  "name": "bool_1",
                  "type": "BOOL"
                },
                {
                  "name": "int_1",
                  "type": "int"
                },
                {
                  "name": "double_1",
                  "type": "double"
                }
              ]
            },
            "name": "Reader", //表示以上配置为reader读数据的配置。
            "category": "reader"
          },
          {
            "stepType": "ots", //写数据使用的插件为Tablestore Writer。
            "parameter": {
              "datasource": "instance02", //已增加的Tablestore目标数据源。
              "table": "timeseriesWriteTable", //目标时序表名称。
              "newVersion": "true", //使用Tablestore Writer的新版本。
              "mode": "normal", //写时序数据mode必须为normal。
              "isTimeseriesTable": "true", //表示该表为时序表。
              "envType": 1,
              "column": [
                {
                  "name": "_m_name" //导入时序表的度量名称字段。
                },
                {
                  "name": "_data_source" //导入时序表的数据源字段。
                },
                {
                  "name": "_tags" //导入时序表的标签字段。
                },
                {
                  "name": "_time" //导入时序表的时间戳字段。
                },
                //导入Tablestore Reader导出的四列字段,导入字段与导出字段的顺序必须一一对应。
                {
                  "name": "string_1",
                  "type": "string"
                },
                {
                  "name": "bool_1",
                  "type": "BOOL"
                },
                {
                  "name": "int_1",
                  "type": "int"
                },
                {
                  "name": "double_1",
                  "type": "double"
                }
              ]
            },
            "name": "Writer", //表示以上配置为writer写数据的配置。
            "category": "writer"
          }
        ],
        "setting": {
          "errorLimit": {
            "record": ""
          },
          "locale": "zh",
          "speed": {
            "throttle": false,
            "concurrent": 2 //同步任务的并发数。
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    3. 单击image.png图标,保存配置。

  4. 执行同步任务。

    说明

    全量数据一般只需要同步一次,无需配置调度属性。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称。

    3. 单击运行

步骤四:查看任务执行结果

执行同步任务后,在DataWorks控制台通过日志查看任务执行状态,在表格存储控制台查看目标时序表数据同步结果。

  1. DataWorks控制台通过日志查看同步任务运行状态。

    1. 在同步任务的结果页签,单击Detail log url对应的链接。

    2. 在任务的详细运行日志页面,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。

  2. 在表格存储控制台查看目标时序表数据同步结果。

    1. 登录表格存储控制台

    2. 概览页面上方,选择地域。

    3. 单击实例名称。

    4. 实例详情页签,单击时序表列表页签。

    5. 时序表列表页签,单击目标时序表操作列的数据管理

    6. 数据管理页签,即可查看同步到该时序表中的数据。

增量数据同步

步骤一:新增数据源

具体操作,请参见新增表格存储数据源

步骤二:新建离线同步任务

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤三:配置离线同步任务并启动

  1. 双击打开新建的离线同步任务节点。

  2. 配置同步网络连接。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    1. 网络与资源配置步骤,选择数据来源Tablestore Stream,并选择数据源名称为源数据源。

      说明

      Tablestore Stream插件主要用于导出Tablestore增量数据,更多信息请参见Tablestore Stream配置同步任务

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向Tablestore,并选择数据源名称为目标数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置同步任务并保存。

    增量数据的同步需要使用到Tablestore Stream ReaderTablestore Writer插件。脚本配置规则请参见Reader脚本Demo与参数说明Writer脚本Demo与参数说明

    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确认以进入脚本模式。

      image

    2. 根据配置示例编辑脚本。

      配置示例如下:

      重要

      为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "otsstream", // 读数据使用的插件为Tablestore Stream Reader。
            "parameter": {
              "datasource": "instance01", // 已增加的Tablestore源数据源。
              "dataTable": "timeseriesReadTable", // 源时序表名称。
              "statusTable": "TableStoreStreamReaderStatusTable", // 用于记录状态的表的名称。如果该表不存在,则Tablestore Stream Reader会自动创建。
              "maxRetries": 30, // 最大重试次数。
              "isExportSequenceInfo": false,  // 不导出时序信息。
              "mode": "single_version_and_update_only", // 模式为行模式(single_version_and_update_only)。
              "isTimeseriesTable": "true", // 表示该表为时序表。
              // 此处的${}用于注入参数,参数会在下一步配置。
              "startTimeString": "${startTime}", // 增量数据的时间范围(左闭右开)的左边界。
              "endTimeString": "${endTime}", // 增量数据的时间范围(左闭右开)的右边界。
              "envType": 1,
              "column": [
                {
                  "name": "_m_name"
                },
                {
                  "name": "_data_source"
                },
                {
                  "name": "_tags"
                },
                {
                  "name": "_time"
                },
                {
                  "name": "string_1",
                  "type": "string"
                },
                {
                  "name": "bool_1",
                  "type": "BOOL"
                },
                {
                  "name": "int_1",
                  "type": "int"
                },
                {
                  "name": "double_1",
                  "type": "double"
                }
              ]
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "ots", // 写数据使用的插件为Tablestore Writer。
            "parameter": {
              "datasource": "instance02", // 已增加的Tablestore目标数据源。
              "table": "timeseriesWriteTable", // 目标时序表名称。
              "newVersion": "true", // 使用Tablestore Writer的新版本。
              "mode": "normal", 
              "isTimeseriesTable": "true", // 表示该表为时序表。
              "envType": 1,
              "column": [
                {
                  "name": "_m_name" // 导入时序表的度量名称字段。
                },
                {
                  "name": "_data_source" // 导入时序表的数据源字段。
                },
                {
                  "name": "_tags" // 导入时序表的标签字段。
                },
                {
                  "name": "_time" // 导入时序表的时间戳字段。
                },
                // 导入Tablestore Steam Reader导出的四列字段。导入字段与导出字段的顺序必须一一对应。
                {
                  "name": "string_1",
                  "type": "string"
                },
                {
                  "name": "bool_1",
                  "type": "BOOL"
                },
                {
                  "name": "int_1",
                  "type": "int"
                },
                {
                  "name": "double_1",
                  "type": "double"
                }
              ]
            },
            "name": "Writer", // 表示以上配置为writer写数据的配置。
            "category": "writer"
          }
        ],
        "setting": {
          "errorLimit": {
            "record": ""
          },
          "locale": "zh",
          "speed": {
            "throttle": false,
            "concurrent": 2  // 同步任务的并发度。
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    3. 单击image.png图标,保存配置。

  4. 配置调度属性。

    1. 单击任务右侧的调度配置

    2. 调度配置面板的调度参数部分,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式

      参数

      参数值

      参数

      参数值

      startTime

      $[yyyymmddhh24-2/24]$[miss-10/24/60]

      endTime

      $[yyyymmddhh24-1/24]$[miss-10/24/60]

      配置示例如下图所示。

      image

      假如任务运行时的时间为2023042319:00:00,则startTime20230423175000,endTime20230423185000。任务将会同步17:5018:50时段内新增的数据。

    3. 时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明

      此处以任务整点每小时自动运行为例介绍配置,如下图所示。

      image

    4. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

      使用工作空间根节点表示该任务无上游的依赖任务。

      image

    5. 配置完成后,关闭配置调度面板。

  5. (可选)根据需要调试脚本代码。

    通过调试脚本代码,确保同步任务能成功同步源时序表的增量数据到目标时序表中。

    重要

    调试脚本代码时配置的时间范围内的数据可能会多次导入到目标时序表,相同时间线会覆盖写入到目标时序表。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称,并配置自定义参数。

      自定义参数的格式为yyyyMMddHHmmss,例如20230423175000。

      image

    3. 单击运行

  6. 提交同步任务。

    提交同步任务后,同步任务会按照配置的调度属性进行运行。

    1. 单击image图标。

    2. 提交对话框,根据需要填写变更描述。

    3. 单击确认

步骤四:查看任务执行结果

  1. DataWorks控制台查看任务运行状态。

    1. 单击同步任务工具栏右侧的运维

    2. 周期实例页面的实例视角页签,查看实例的运行详情。

      image

  2. 在表格存储控制台查看目标时序表数据同步结果。

    1. 登录表格存储控制台

    2. 概览页面上方,选择地域。

    3. 单击实例名称。

    4. 实例详情页签,单击时序表列表页签。

    5. 时序表列表页签,单击目标时序表操作列的数据管理

    6. 数据管理页签,即可查看同步到该时序表中的数据。

  • 本页导读 (1)
  • 背景信息
  • 前提条件
  • 全量数据同步
  • 步骤一:新增数据源
  • 步骤二:新建离线同步任务
  • 步骤三:配置离线同步任务并启动
  • 步骤四:查看任务执行结果
  • 增量数据同步
  • 步骤一:新增数据源
  • 步骤二:新建离线同步任务
  • 步骤三:配置离线同步任务并启动
  • 步骤四:查看任务执行结果