增量同步(脚本模式)

更新时间: 2023-06-21 16:26:25

如果需要将表格存储中新增和变化的数据定期同步到OSS中备份或者使用,您可以通过在DataWorks数据集成控制台新建和配置离线同步任务来实现周期性增量数据同步。

前提条件

步骤一:新建同步任务节点

  1. 进入数据开发页面。

    1. 以项目管理员身份登录DataWorks控制台

    2. 选择地域,在左侧导航栏,单击工作空间列表

    3. 工作空间列表页面,单击工作空间操作列的数据开发

  2. 在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤二:配置离线同步任务并启动

配置表格存储到OSS的增量数据同步任务,具体步骤如下:

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络链接。

    1. 选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

      重要

      数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    2. 网络与资源配置步骤,选择数据来源OTSStream,并选择数据源名称为表格存储数据源。

    3. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    4. 选择数据去向OSS,并选择数据源名称为OSS数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    5. 测试可连通后,单击下一步

    6. 提示对话框,单击确认使用脚本模式

      重要
      • 表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。

      • 任务转为脚本模式后,将无法转为向导模式。

  3. 配置任务并保存。

    增量数据的同步需要使用到OTSStream Reader和MaxCompute Writer插件。脚本配置规则请参见OTSStream数据源OSS数据源

    1. 在脚本配置页面,请根据如下示例完成配置。

      重要

      为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "otsstream", #插件名,不能修改。
                  "parameter": {
                      "statusTable": "TableStoreStreamReaderStatusTable", #存储Tablestore Stream状态的表,一般无需修改。
                      "maxRetries": 30, #最大重试次数。
                      "mode": "single_version_and_update_only", #Tablestore Stream导出数据的格式,目前需要设置为single_version_and_update_only。如果配置模板中无此项,则需要增加。
      				        "isExportSequenceInfo": false, #single_version_and_update_only模式下只能设置为false。
                      "datasource": "", #数据源名称,请根据实际填写。
                      "dataTable": "", #Tablestore中的数据表名称。
                      "startTimeString": "${startTime}", #开始导出的时间点,由于是增量导出,需要循环启动此任务,则此处每次启动时的时间都不同,因此需要设置一个变量,例如${startTime}。
                      "endTimeString": "${endTime}", #结束导出的时间点。此处也需要设置一个变量,例如${endTime}。
      				        "column":[  #设置数据表中需要导出到OSS中的列,如果配置模板中无此项则需要增加,具体列个数由用户自定义设置。如果不配置则导出全部列。
                        {
                           "name": "column1" 
                        },
                        {
                           "name": "column2"  
                        }
                      ]
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "oss", #Writer插件的名称,不能修改。
                  "parameter": {
                      "nullFormat": "null", #定义null值的字符串标识符方式,可以是空字符串。
                      "dateFormat": "yyyy-MM-dd HH:mm:ss", #时间格式。
                      "datasource": "osstest", #OSS数据源名称,请根据实际填写。
                      "writeMode": "truncate", #当同名文件存在时系统进行的操作,可选值包括truncate、append和nonConflict。truncate表示会清理已存在的同名文件,append表示会增加到已存在的同名文件内容后面,nonConflict表示当同名文件存在时会报错。
                      "encoding": "UTF-8", #编码类型。
                      "fieldDelimiter": ",", #每一列的分隔符。
                      "fileFormat": "csv", #文件类型,可选值包括csv、txt和parquet格式。
                      "object": "" #备份到OSS的文件名前缀,建议使用"Tablestore实例名/表名/date",例如"instance/table/{date}"。
                  },
                  "name": "Writer",
                  "category": "writer"
              },
              {
                  "parameter": {},
                  "name": "Processor",
                  "category": "processor"
              }
          ],
          "setting": {
              "errorLimit": {
                  "record": "0"  #允许出错的个数。当错误超过这个数目的时候同步任务会失败。
              },
              "locale": "zh",
              "speed": {
                  "throttle":true, #当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1 #作业并发数。
                  "mbps":"12" #限流。
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }

    2. 单击image.png图标,保存脚本。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  4. 配置调度属性。

    通过调度配置,您可以配置同步任务的执行时间、重跑属性、调度依赖等。

    1. 单击任务右侧的调度配置

    2. 调度配置面板的参数部分,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式

      参数

      参数值

      startTime

      $[yyyymmddhh24-2/24]$[miss-10/24/60]

      endTime

      $[yyyymmddhh24-1/24]$[miss-10/24/60]

      配置示例如下图所示。

      image..png

      假如任务运行时的时间为2023年04月23日19:00:00点,则startTime为20230423175000,endTime为20230423185000。任务将会同步17:50到18:50时段内新增的数据。

    3. 时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明

      此处以任务整点每小时自动运行为例介绍配置,如下图所示。

      image..png
    4. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

      使用工作空间根节点表示该任务无上游的依赖任务。

      image..png
    5. 配置完成后,关闭配置调度面板。

  5. (可选)根据需要调试脚本代码。

    通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到OSS中。

    重要

    调试脚本代码时配置的时间范围内的数据可能会多次导入到OSS中,相同数据行会覆盖写入到OSS中。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称,并配置自定义参数。

      自定义参数的格式为yyyyMMddHHmmss,例如20230423175000。

      image
    3. 单击运行

  6. 提交同步任务。

    提交同步任务后,同步任务会按照配置的调度属性进行运行。

    1. 单击image图标。

    2. 提交对话框,根据需要填写变更描述。

    3. 单击确认

步骤三:查看任务执行结果

  1. 在DataWorks控制台查看任务运行状态。

    1. 单击同步任务工具栏右侧的运维

    2. 周期实例页面的实例视角页签,查看实例的运行状态。

  2. 在OSS管理控制台查看数据同步结果。

    1. 登录OSS管理控制台

    2. Bucket列表页面,找到目标Bucket后,单击Bucket名称。

    3. 文件列表页签,选择相应文件,下载后可查看内容是否符合预期。

阿里云首页 表格存储 相关技术圈