同步增量数据到MaxCompute

如果需要将表格存储中新增和变化的数据定期同步到MaxCompute中备份或者使用,您可以通过在DataWorks数据集成控制台新建和配置离线同步任务来实现周期性增量数据同步。

前提条件

步骤一:新建同步任务节点

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤二:配置离线同步任务并启动

配置表格存储MaxCompute的增量数据同步任务,具体步骤如下:

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络链接。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    1. 网络与资源配置步骤,选择数据来源Tablestore Stream,并选择数据源名称表格存储数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向MaxCompute(ODPS),并选择数据源名称MaxCompute数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    您可以通过向导模式或者脚本模式配置任务,请根据实际需要选择。

    (推荐)向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际配置数据来源和数据去向。

      数据来源配置

      参数

      说明

      表格存储中的数据表名称。

      开始时间

      增量读取数据的开始时间和结束时间,分别配置为变量形式${startTime}${endTime},具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。

      结束时间

      状态表

      用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。

      最大重试次数

      TableStore中读取增量数据时,每次请求的最大重试次数。

      导出时序信息

      是否导出时序信息,时序信息包含了数据的写入时间等。

      数据去向配置

      参数

      说明

      Tunnel资源组

      Tunnel Quota,默认选择“公共传输资源”,即MC的免费quota。

      MaxCompute的数据传输资源选择,具体请购买与使用独享数据传输服务资源组

      说明

      如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。

      MaxCompute中的表名称。

      重要

      请确保目标表中字段与源表中字段的数量和类型相匹配。

      如果未创建与源表匹配的目标表,请执行如下操作:

      1. 单击一键生成目标表结构

      2. 新建表对话框,根据实际修改目标表的字段类型和源表的字段类型相匹配。

      3. 单击新建表

      分区信息

      如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate}

      写入模式

      数据写入表中的模式。取值范围如下:

      • 写入前保留已有数据(Insert Into):直接向表或静态分区中插入数据。

      • 写入前清理已有数据(Insert Overwrite):先清空表中的原有数据,再向表或静态分区中插入数据。

      空字符串转为Null写入

      如果源头数据为空字符串,在向目标MaxCompute列写入时是否转为Null值写入。

      同步完成才可见

      单击高级配置后才会显示该参数。

      同步到MaxCompute中的数据是否在同步完成后才能被查询到。

    2. 字段映射区域,系统自动进行同名映射,保持默认即可。

      image.png

    3. 通道控制区域,配置任务运行参数,例如任务期望最大并发数、同步速率、脏数据策略、分布式处理能力等。关于参数配置的更多信息,请参见配置通道

    4. 单击image.png图标,保存配置。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

    脚本模式

    增量数据的同步需要使用到Tablestore Stream ReaderMaxCompute Writer插件。脚本配置规则请参见Tablestore Stream数据源MaxCompute数据源

    重要

    任务转为脚本模式后,将无法转为向导模式。

    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确认

    2. 在脚本配置页面,请根据如下示例完成配置。

      重要

      为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。

      {
          "type": "job",
          "steps": [
              {
                  "stepType": "otsstream",    # Reader插件的名称。
                  "parameter": {
                      "mode": "single_version_and_update_only",    # 配置导出模式,默认不设置,为列模式。
                      "statusTable": "TableStoreStreamReaderStatusTable",    # 存储TableStore Stream状态的表,一般无需修改。
                      "maxRetries": 30,    # 从TableStore中读取增量数据时,每次请求的最大重试次数,默认为30。重试之间有间隔,重试30次的总时间约为5分钟,一般无需修改。
                      "isExportSequenceInfo": false,    # 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。single_version_and_update_only模式下只能为false。
                      "datasource": "",    # Tablestore的数据源名称,如果有此项则无需配置endpoint、accessId、accessKeyinstanceName。
                      "column": [    # 按照需求添加需要导出TableStore中的列,您可以自定义个数。
                          {
                              "name": "h"    # 列名示例,可以是主键或属性列。
                          },
                          {
                              "name": "n"
                          },
                          {
                              "name": "s"
                          }
                      ],
                      "startTimeString": "${startTime}",    # 增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位毫秒。输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
                      "table": "",    # TableStore中的表名。
                      "endTimeString": "${endTime}"    # 增量数据的时间范围(左闭右开)的右边界,格式为yyyymmddhh24miss,单位为毫秒。输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。注意,endTime需要比调度时间提前5分钟及以上。
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",    # Writer插件的名称。
                  "parameter": {
                      "partition": "",    # 需要写入数据表的分区信息。
                      "truncate": false,    # 清理规则,写入前是否清理已有数据。
                      "compress": false,    # 是否压缩。
                      "datasource": "",    # 数据源名。
                      "column": [    # 需要导入的字段列表。
                          "h",
                          "n",
                          "s"
                      ],
                      "emptyAsNull": false,    # 空字符串是否作为null,默认是。
                      "table": ""    # 表名。
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": "0"    # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
              },
              "speed": {
                  "throttle": false,    #同步速率不限流。
                  "concurrent": 2    # 每次同步任务的并发度。
              }
          }
      }
    3. 单击image.png图标,保存配置。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  4. 配置调度属性。

    通过调度配置,您可以配置同步任务的执行时间、重跑属性、调度依赖等。

    1. 单击任务右侧的调度配置

    2. 调度配置面板的调度参数区域,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式

      参数

      参数值

      bizdate

      该变量名无特殊含义,您可根据业务需求自定义代码中的变量名。

      startTime

      $[yyyymmddhh24-2/24]$[miss-10/24/60]

      endTime

      $[yyyymmddhh24-1/24]$[miss-10/24/60]

      配置示例如下图所示。

      image

      假如任务运行时的时间为2023042319:00:00点,则startTime20230423175000,endTime20230423185000。任务将会同步17:5018:50时段内新增的数据。

    3. 时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明

      此处以任务整点每小时自动运行为例介绍配置,如下图所示。

      image

    4. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

      使用工作空间根节点表示该任务无上游的依赖任务。

      image

    5. 配置完成后,关闭配置调度面板。

    6. 单击image.png图标,保存配置。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  5. (可选)根据需要调试脚本代码。

    通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到MaxCompute中。

    重要

    调试脚本代码时配置的时间范围内的数据可能会多次导入到MaxCompute中,相同数据行会覆盖写入到MaxCompute中。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称,并配置自定义参数。

      自定义参数的格式为yyyyMMddHHmmss,例如20230423175000。

      image

    3. 单击运行

  6. 提交同步任务。

    提交同步任务后,同步任务会按照配置的调度属性进行运行。

    1. 单击image图标。

    2. 提交对话框,根据需要填写变更描述。

    3. 单击确认

步骤三:查看任务执行结果

  1. DataWorks控制台查看任务运行状态。

    1. 单击同步任务工具栏右侧的运维

    2. 周期实例页面的实例视角页签,查看实例的运行状态。

  2. 查看数据同步结果。

    您可使用开发ODPS SQL任务创建临时查询功能,通过表操作查询MaxCompute表的数据。

后续操作

同步到MaxCompute中的增量数据为列数据的变化记录,您可以根据需要将表格存储的增量数据转换为全量数据格式。具体操作,请参见将表格存储的增量数据转换为全量数据格式

相关文档