同步增量数据到OSS

如果需要定期将表格存储中新增和变化的数据同步至OSS中进行备份或使用,您可以通过在DataWorks数据集成平台上配置离线同步任务,实现周期性增量数据同步。

注意事项

  • 此功能适用于表格存储的数据表和时序表,支持的任务配置模式及导出模式请参见下表。

    Tablestore源表类型

    离线同步任务配置模式

    Tablestore Stream Reader导出模式

    数据表

    向导模式

    列模式

    脚本模式

    时序表

    脚本模式

    行模式

    重要

    使用行模式导出的数据保留完整行结构,但要求写入Tablestore的数据必须采用整行写入方式,如果用户存在单独更新部分列的行为,则可能导致导出的行数据中未更新列的值为空。

    例如物联网时序数据,通常以整行写入方式写入(如每行记录设备的全部传感器数据),且后续无修改操作,符合行模式要求。

  • 增量同步采用周期调度机制,每隔5分钟进行一次调度,且插件存在5分钟的延迟,因此同步的总延迟为5~10分钟。

准备工作

重要

如果OSS的存储空间(Bucket)和Tablestore实例不在同一地域,请参考以下操作步骤创建VPC对等连接实现跨地域网络连通。

创建VPC对等连接实现跨地域网络连通

此处以DataWorks工作空间与OSS的存储空间(Bucket)位于同一地域(华东1(杭州)),而Tablestore实例位于华东2(上海)为例进行说明。

  1. Tablestore实例绑定VPC。

    1. 登录表格存储控制台,在页面上方选择地域。

    2. 单击实例别名进入实例管理页面。

    3. 切换到网络管理页签,单击绑定VPC,选择VPC和交换机并填写VPC名称,然后单击确定

    4. 请耐心等待一段时间,VPC绑定成功后页面将自动刷新,您可以在VPC列表查看绑定的VPC IDVPC访问地址

      说明

      后续在DataWorks控制台添加Tablestore数据源时,将使用该VPC访问地址。

      image

  2. 获取DataWorks工作空间资源组的VPC信息。

    1. 登录DataWorks控制台,在页面上方选择工作空间所在地域,然后单击左侧工作空间菜单,进入工作空间列表页面。

    2. 单击工作空间名称进入空间详情页面,单击左侧资源组菜单,查看工作空间绑定的资源组列表。

    3. 在目标资源组右侧单击网络设置,在资源调度 & 数据集成区域查看绑定的专有网络,即VPC ID

  3. 创建VPC对等连接并配置路由。

    1. 登录专有网络VPC控制台。在页面左侧单击专有网络菜单,依次选择Tablestore实例和DataWorks工作空间所在地域,并记录VPC ID对应的网段地址。

      image

    2. 在页面左侧单击VPC对等连接菜单,然后在VPC对等连接页面单击创建对等连接

    3. 创建对等连接页面,输入对等连接名称,选择发起端VPC实例、接收端账号类型、接收端地域和接收端VPC实例,单击确定

    4. VPC对等连接页面,找到已创建的VPC对等连接,分别在发起端VPC实例列和接收端VPC实例列单击配置路由条目

      目标网段需填写对端VPC的网段地址。即在发起端VPC实例配置路由条目时,填写接收端VPC实例的网段地址;在接收端VPC实例配置路由条目时,填写发起端VPC实例的网段地址。

操作步骤

步骤一:新增表格存储数据源

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,搜索并选择数据源类型为Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    地域

    选择Tablestore实例所属地域。

    Table Store实例名称

    Tablestore实例的名称。

    Endpoint

    Tablestore实例的服务地址,推荐使用VPC地址

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。

    AccessKey Secret

  6. 测试资源组连通性。创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    1. 连接配置区域,单击相应资源组连通状态列的测试连通性

    2. 测试连通性通过后,连通状态显示可连通,单击完成。您可以在数据源列表中查看新建的数据源。

      说明

      如果测试连通性结果为无法通过,您可使用连通性诊断工具自助解决。如仍无法连通资源组与数据源,请提交工单处理。

步骤二:新增OSS数据源

操作与步骤一相似,在新增数据源对话框中搜索并选择数据源类型为OSS,随后配置相关的数据源参数。

重要

OSS数据源的访问模式支持RAM角色授权模式Access Key模式,请根据实际需求选择适合的访问模式。

  • RAM角色授权模式:通过STS授权的方式允许云产品服务账号扮演相关角色来访问数据源,具备更高安全性。更多信息,请参见通过RAM角色授权模式配置数据源

    首次选择访问模式RAM角色授权模式时,系统会显示警告对话框,提示创建相关服务关联角色的信息,单击开启授权进行授权。开启授权后,选择角色为新建的服务关联角色。
  • Access Key模式:通过阿里云账号或者RAM用户的AccessKey IDAccessKey Secret访问数据源。

步骤三:配置离线同步任务

数据开发(Data Studio)旧版

一、新建任务节点

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键单击,然后选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写名称,然后单击确认

    数据集成节点下,将显示新建的离线同步节点。

二、配置同步任务

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    1. 网络与资源配置步骤,选择数据来源Tablestore Stream,并选择数据源名称为新增的表格存储数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      说明

      Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

    3. 选择数据去向OSS,并选择数据源名称为新增的OSS数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    重要

    同步数据表的增量数据到OSS时,您可以根据实际需求选择向导模式脚本模式

    同步时序表的增量数据到OSS时,只支持脚本模式

    向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源

      参数

      说明

      数据源

      默认显示上一步选择的Tablestore数据源。

      源数据表。

      开始时间

      增量读取数据的开始时间和结束时间,分别配置为变量形式${startTime}${endTime},具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。

      结束时间

      状态表

      用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。

      最大重试次数

      TableStore中读取增量数据时,每次请求的最大重试次数。

      导出时序信息

      是否导出时序信息,时序信息包含了数据的写入时间等。

      数据去向

      参数

      说明

      数据源

      默认显示上一步选择的OSS数据源。

      文本类型

      写入OSS的文件类型,例如csvtext

      说明

      不同文件类型支持的配置有差异,请以实际界面为准。

      文件名(含路径)

      仅在文本类型csvtextorc时,需配置该参数。

      写入OSS的文件路径,支持使用星号(*)作为通配符。例如tablestore/fullData/myotsdata.csv

      文件路径

      仅在文本类型parquet时,需配置该参数。

      写入OSS的文件路径,例如tablestore/fullData

      文件名

      仅在文本类型parquet时,需配置该参数。

      写入OSS的文件名称。

      列分隔符

      仅在文本类型csvtext时,需配置该参数。

      写入OSS文件时,列之间使用的分隔符。例如配置为 \u001b

      行分隔符

      仅在文本类型text时,需配置该参数。

      自定义行分隔符,用来分隔不同数据行。例如配置为\\u0001

      说明

      建议您使用数据中不存在的分隔符作为行分隔符;如果要使用LinuxWindows平台的默认行分隔符(\n\r\n),建议置空此配置,平台将自动适配读取。

      编码

      仅在文本类型csvtext时,需配置该参数。

      写入文件的编码配置。

      null

      仅在文本类型csvtextorc时,需配置该参数。

      源数据源中可以表示为null的字符串。例如配置为null,如果源数据是null,则系统将视作null字段。

      时间格式

      仅在文本类型csvtext时,需配置该参数。

      日期类型的数据写入到OSS文件的时间格式,例如yyyy-MM-dd

      前缀冲突

      当设置的文件名与OSS中已有文件名冲突时的处理方法,取值范围如下:

      • 替换原有文件:删除原始文件,重建一个同名文件。

      • 保留原有文件:保留原始文件,重建一个新文件,名称为原文件名加随机后缀。

      • 退出报错:同步任务停止执行。

      切分文件

      仅在文本类型csvtext时,需配置该参数。

      OSS写出时单个Object文件的最大大小,默认为10000*10MB,类似log4j日志打印时根据日志文件大小轮转。

      OSS分片上传时,每个分片大小为10MB(也是轮转文件最小粒度,即小于10MB的分块大小会被作为10MB),OSS 分片上传支持的分块最大数量为10000。轮转发生时,object名称规则是:在原有object前缀加UUID随机数。

      写为一个文件

      仅在文本类型csvtext时,需配置该参数。

      在将数据写入OSS时,是否以单个文件的形式写入。

      • 默认写多个文件。当读不到任何数据时,如果配置了文件头,将输出只包含文件头的空文件,否则只输出空文件。

      • 如果需要以单个文件形式写入,请选择写为一个文件单选框。当都读不到任何数据时,不会产生空文件。

      首行输出表头

      仅在文本类型csvtext时,需配置该参数。

      写入文件时,是否在第一行输出表头。默认不输出表头。如需在第一行输出表头,请选中首行输出表头单选框。

    2. 配置字段映射。

      来源字段中包括了源表主键和增量变更信息,目标字段不支持配置。

      image

    3. 配置通道控制。

      您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击image.png图标,保存配置。

    脚本模式

    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确定image

    2. 在脚本配置页面,编辑脚本。

      脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。

      • 同步数据表数据

        • 行模式导出

          脚本配置示例与参数说明

          {
            "type": "job",
            "version": "2.0",
            "steps": [
              {
                "stepType": "otsstream",
                "parameter": {
                  "datasource": "MyTablestoreDatasource",
                  "dataTable": "source_table",
                  "statusTable": "TableStoreStreamReaderStatusTable",
                  "newVersion": "true",
                  "mode": "single_version_and_update_only",
                  "startTimeString": "${startTime}",
                  "endTimeString": "${endTime}",
                  "envType": 1,
                  "column": [
                    {
                      "name": "pk"
                    },
                    {
                      "name": "col_string"
                    },
                    {
                      "name": "col_long"
                    },
                    {
                      "name": "col_double"
                    },
                    {
                      "name": "col_bool"
                    },
                    {
                      "name": "col_binary"
                    }
                  ],
                  "maxRetries": 30,
                  "isExportSequenceInfo": false
                },
                "name": "Reader",
                "category": "reader"
              },
              {
                "stepType": "oss",
                "parameter": {
                  "datasource": "MyOSSDatasource",
                  "fileFormat": "csv",
                  "object": "tablestore/incrementData/tablestoreData.csv",
                  "fieldDelimiter": ",",
                  "encoding": "UTF-8",
                  "nullFormat": "null",
                  "dateFormat": "yyyy-MM-dd HH:mm:ss",
                  "writeMode": "append",
                  "writeSingleObject": true,
                  "envType": 1
                },
                "name": "Writer",
                "category": "writer"
              },
              {
                "copies": 1,
                "parameter": {
                  "nodes": [],
                  "edges": [],
                  "groups": [],
                  "version": "2.0"
                },
                "name": "Processor",
                "category": "processor"
              }
            ],
            "setting": {
              "errorLimit": {
                "record": "0"
              },
              "locale": "zh",
              "speed": {
                "concurrent": 3,
                "throttle": false
              }
            },
            "order": {
              "hops": [
                {
                  "from": "Reader",
                  "to": "Writer"
                }
              ]
            }
          }
          • Tablestore Reader需要替换的参数说明如下:

            参数名称

            说明

            datasource

            Tablestore数据源名称。

            dataTable

            源表名称。

            column

            读取源表的列信息。

          • OSS Writer需要替换的参数说明如下:

            参数名称

            说明

            datasource

            OSS数据源名称。

            object

            OSS Writer写入的文件名(含路径)。

        • 列模式导出

          脚本配置示例与参数说明

          {
            "type": "job",
            "version": "2.0",
            "steps": [
              {
                "stepType": "otsstream",
                "parameter": {
                  "datasource": "MyTablestoreDatasource",
                  "dataTable": "source_table",
                  "statusTable": "TableStoreStreamReaderStatusTable",
                  "newVersion": "true",
                  "startTimeString": "${startTime}",
                  "endTimeString": "${endTime}",
                  "envType": 1,
                  "column": [
                    "pk",
                    "colName",
                    "version",
                    "colValue",
                    "opType",
                    "sequenceInfo"
                  ],
                  "maxRetries": 30,
                  "isExportSequenceInfo": false
                },
                "name": "Reader",
                "category": "reader"
              },
              {
                "stepType": "oss",
                "parameter": {
                  "datasource": "MyOSSDatasource",
                  "fileFormat": "csv",
                  "object": "tablestore/incrementData/tablestoreData.csv",
                  "fieldDelimiter": ",",
                  "encoding": "UTF-8",
                  "nullFormat": "null",
                  "dateFormat": "yyyy-MM-dd HH:mm:ss",
                  "writeMode": "append",
                  "writeSingleObject": true,
                  "envType": 1
                },
                "name": "Writer",
                "category": "writer"
              },
              {
                "copies": 1,
                "parameter": {
                  "nodes": [],
                  "edges": [],
                  "groups": [],
                  "version": "2.0"
                },
                "name": "Processor",
                "category": "processor"
              }
            ],
            "setting": {
              "errorLimit": {
                "record": "0"
              },
              "locale": "zh",
              "speed": {
                "concurrent": 3,
                "throttle": false
              }
            },
            "order": {
              "hops": [
                {
                  "from": "Reader",
                  "to": "Writer"
                }
              ]
            }
          }
          • Tablestore Reader需要替换的参数说明如下:

            参数名称

            说明

            datasource

            Tablestore数据源名称。

            dataTable

            源表名称。

            column

            源表主键和增量变更信息。

          • OSS Writer需要替换的参数说明如下:

            参数名称

            说明

            datasource

            OSS数据源名称。

            object

            OSS Writer写入的文件名(含路径)。

      • 同步时序表数据

        脚本配置示例与参数说明

        {
          "type": "job",
          "version": "2.0",
          "steps": [
            {
              "stepType": "otsstream",
              "parameter": {
                "datasource": "MyTablestoreDatasource",
                "dataTable": "source_table",
                "statusTable": "TableStoreStreamReaderStatusTable",
                "newVersion": "true",
                "isTimeseriesTable":"true",
                "mode": "single_version_and_update_only",
                "startTimeString": "${startTime}",
                "endTimeString": "${endTime}",
                "envType": 1,
                "column": [
                  {
                    "name": "_m_name"
                  },
                  {
                    "name": "_data_source"
                  },
                  {
                    "name": "_tags"
                  },
                  {
                    "name": "_time"
                  },
                  {
                    "name": "col_string",
                    "type": "STRING"
                  },
                  {
                    "name": "col_long",
                    "type": "INT"
                  },
                  {
                    "name": "col_double",
                    "type": "DOUBLE"
                  },
                  {
                    "name": "col_bool",
                    "type": "BOOL"
                  },
                  {
                    "name": "col_binary",
                    "type": "BINARY"
                  }
                ],
                "maxRetries": 30,
                "isExportSequenceInfo": false
              },
              "name": "Reader",
              "category": "reader"
            },
            {
              "stepType": "oss",
              "parameter": {
                "datasource": "MyOSSDatasource",
                "fileFormat": "csv",
                "object": "tablestore/incrementData/tablestoreData.csv",
                "fieldDelimiter": ",",
                "encoding": "UTF-8",
                "nullFormat": "null",
                "dateFormat": "yyyy-MM-dd HH:mm:ss",
                "writeMode": "append",
                "writeSingleObject": true,
                "envType": 1
              },
              "name": "Writer",
              "category": "writer"
            },
            {
              "name": "Processor",
              "stepType": null,
              "category": "processor",
              "copies": 1,
              "parameter": {
                "nodes": [],
                "edges": [],
                "groups": [],
                "version": "2.0"
              }
            }
          ],
          "setting": {
            "executeMode": null,
            "errorLimit": {
              "record": "0"
            },
            "speed": {
              "concurrent": 3,
              "throttle": false
            }
          },
          "order": {
            "hops": [
              {
                "from": "Reader",
                "to": "Writer"
              }
            ]
          }
        }
        • Tablestore Reader需要替换的参数说明如下:

          参数名称

          说明

          datasource

          Tablestore数据源名称。

          dataTable

          源表名称。

          column

          读取源表的列信息。

        • OSS数据源需要替换的参数说明如下:

          参数名称

          说明

          datasource

          OSS数据源名称。

          object

          OSS Writer写入的文件名(含路径)。

    3. 单击image.png图标,保存配置。

三、配置调度属性

  1. 单击任务右侧的调度配置

  2. 调度配置面板的调度参数部分,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式

    参数

    参数值

    startTime

    $[yyyymmddhh24-2/24]$[miss-10/24/60]

    endTime

    $[yyyymmddhh24-1/24]$[miss-10/24/60]

    配置示例如下图所示。

    image

    假如任务运行时的时间为2023042319:00:00,startTime20230423175000,endTime20230423185000。任务将会同步在17:5018:50时段内新增的数据。

  3. 时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明

    此处以任务整点每小时自动运行为例介绍配置,如下图所示。

    image

  4. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

    使用工作空间根节点表示该任务无上游的依赖任务。

    image

  5. 配置完成后,关闭配置调度面板。

  6. 单击image.png图标,保存配置。

四、(可选)调试脚本代码

通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到OSS中。

说明

调试脚本代码时配置的时间范围内的数据可能会被多次写入到OSS文件中。

  1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

  2. 参数对话框,选择运行资源组的名称,并配置自定义参数。

    自定义参数的格式为yyyyMMddHHmmss,例如20250631170000。

    image

  3. 单击运行

    任务运行完成后,您可以在OSS管理控制台下载写入文件以查看同步的数据。

五、发布同步任务

提交同步任务后,同步任务会按照配置的调度属性运行。

  1. 单击image图标。

  2. 提交对话框,根据需要填写变更描述。

  3. 单击确认

数据开发(Data Studio)新版

一、新建任务节点

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入Data Studio

  2. DataStudio控制台的数据开发页面,单击项目目录右侧的image图标,然后选择新建节点 > 数据集成 > 离线同步

    说明

    首次使用项目目录时,也可以直接单击新建节点按钮。

  3. 新建节点对话框,选择路径并填写名称,然后单击确认

    项目目录下,将显示新建的离线同步节点。

二、配置同步任务

  1. 项目目录下,单击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    1. 网络与资源配置步骤,选择数据来源Tablestore Stream,并选择数据源名称为新增的表格存储数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      说明

      Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

    3. 选择数据去向OSS,并选择数据源名称为新增的OSS数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    重要

    同步数据表的增量数据到OSS时,您可以根据实际需求选择向导模式脚本模式

    同步时序表的增量数据到OSS时,只支持脚本模式

    向导模式

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源

      参数

      说明

      数据源

      默认显示上一步选择的Tablestore数据源。

      源数据表。

      开始时间

      增量读取数据的开始时间和结束时间,分别配置为变量形式${startTime}${endTime},具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。

      结束时间

      状态表

      用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。

      最大重试次数

      TableStore中读取增量数据时,每次请求的最大重试次数。

      导出时序信息

      是否导出时序信息,时序信息包含了数据的写入时间等。

      数据去向

      参数

      说明

      数据源

      默认显示上一步选择的OSS数据源。

      文本类型

      写入OSS的文件类型,例如csvtext

      说明

      不同文件类型支持的配置有差异,请以实际界面为准。

      文件名(含路径)

      仅在文本类型csvtextorc时,需配置该参数。

      写入OSS的文件路径,支持使用星号(*)作为通配符。例如tablestore/fullData/myotsdata.csv

      文件路径

      仅在文本类型parquet时,需配置该参数。

      写入OSS的文件路径,例如tablestore/fullData

      文件名

      仅在文本类型parquet时,需配置该参数。

      写入OSS的文件名称。

      列分隔符

      仅在文本类型csvtext时,需配置该参数。

      写入OSS文件时,列之间使用的分隔符。例如配置为 \u001b

      行分隔符

      仅在文本类型text时,需配置该参数。

      自定义行分隔符,用来分隔不同数据行。例如配置为\\u0001

      说明

      建议您使用数据中不存在的分隔符作为行分隔符;如果要使用LinuxWindows平台的默认行分隔符(\n\r\n),建议置空此配置,平台将自动适配读取。

      编码

      仅在文本类型csvtext时,需配置该参数。

      写入文件的编码配置。

      null

      仅在文本类型csvtextorc时,需配置该参数。

      源数据源中可以表示为null的字符串。例如配置为null,如果源数据是null,则系统将视作null字段。

      时间格式

      仅在文本类型csvtext时,需配置该参数。

      日期类型的数据写入到OSS文件的时间格式,例如yyyy-MM-dd

      前缀冲突

      当设置的文件名与OSS中已有文件名冲突时的处理方法,取值范围如下:

      • 替换原有文件:删除原始文件,重建一个同名文件。

      • 保留原有文件:保留原始文件,重建一个新文件,名称为原文件名加随机后缀。

      • 退出报错:同步任务停止执行。

      切分文件

      仅在文本类型csvtext时,需配置该参数。

      OSS写出时单个Object文件的最大大小,默认为10000*10MB,类似log4j日志打印时根据日志文件大小轮转。

      OSS分片上传时,每个分片大小为10MB(也是轮转文件最小粒度,即小于10MB的分块大小会被作为10MB),OSS 分片上传支持的分块最大数量为10000。轮转发生时,object名称规则是:在原有object前缀加UUID随机数。

      写为一个文件

      仅在文本类型csvtext时,需配置该参数。

      在将数据写入OSS时,是否以单个文件的形式写入。

      • 默认写多个文件。当读不到任何数据时,如果配置了文件头,将输出只包含文件头的空文件,否则只输出空文件。

      • 如果需要以单个文件形式写入,请选择写为一个文件单选框。当都读不到任何数据时,不会产生空文件。

      首行输出表头

      仅在文本类型csvtext时,需配置该参数。

      写入文件时,是否在第一行输出表头。默认不输出表头。如需在第一行输出表头,请选中首行输出表头单选框。

    2. 配置字段映射。

      来源字段中包括了源表主键和增量变更信息,目标字段不支持配置。

      image

    3. 配置通道控制。

      您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击保存,保存配置。

    脚本模式

    1. 配置任务步骤,单击脚本模式,然后在弹出的对话框中单击确定image

    2. 在脚本配置页面,编辑脚本。

      脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。

      • 同步数据表数据

        • 行模式导出

          脚本配置示例与参数说明

          {
            "type": "job",
            "version": "2.0",
            "steps": [
              {
                "stepType": "otsstream",
                "parameter": {
                  "datasource": "MyTablestoreDatasource",
                  "dataTable": "source_table",
                  "statusTable": "TableStoreStreamReaderStatusTable",
                  "newVersion": "true",
                  "mode": "single_version_and_update_only",
                  "startTimeString": "${startTime}",
                  "endTimeString": "${endTime}",
                  "envType": 1,
                  "column": [
                    {
                      "name": "pk"
                    },
                    {
                      "name": "col_string"
                    },
                    {
                      "name": "col_long"
                    },
                    {
                      "name": "col_double"
                    },
                    {
                      "name": "col_bool"
                    },
                    {
                      "name": "col_binary"
                    }
                  ],
                  "maxRetries": 30,
                  "isExportSequenceInfo": false
                },
                "name": "Reader",
                "category": "reader"
              },
              {
                "stepType": "oss",
                "parameter": {
                  "datasource": "MyOSSDatasource",
                  "fileFormat": "csv",
                  "object": "tablestore/incrementData/tablestoreData.csv",
                  "fieldDelimiter": ",",
                  "encoding": "UTF-8",
                  "nullFormat": "null",
                  "dateFormat": "yyyy-MM-dd HH:mm:ss",
                  "writeMode": "append",
                  "writeSingleObject": true,
                  "envType": 1
                },
                "name": "Writer",
                "category": "writer"
              },
              {
                "copies": 1,
                "parameter": {
                  "nodes": [],
                  "edges": [],
                  "groups": [],
                  "version": "2.0"
                },
                "name": "Processor",
                "category": "processor"
              }
            ],
            "setting": {
              "errorLimit": {
                "record": "0"
              },
              "locale": "zh",
              "speed": {
                "concurrent": 3,
                "throttle": false
              }
            },
            "order": {
              "hops": [
                {
                  "from": "Reader",
                  "to": "Writer"
                }
              ]
            }
          }
          • Tablestore Reader需要替换的参数说明如下:

            参数名称

            说明

            datasource

            Tablestore数据源名称。

            dataTable

            源表名称。

            column

            读取源表的列信息。

          • OSS Writer需要替换的参数说明如下:

            参数名称

            说明

            datasource

            OSS数据源名称。

            object

            OSS Writer写入的文件名(含路径)。

        • 列模式导出

          脚本配置示例与参数说明

          {
            "type": "job",
            "version": "2.0",
            "steps": [
              {
                "stepType": "otsstream",
                "parameter": {
                  "datasource": "MyTablestoreDatasource",
                  "dataTable": "source_table",
                  "statusTable": "TableStoreStreamReaderStatusTable",
                  "newVersion": "true",
                  "startTimeString": "${startTime}",
                  "endTimeString": "${endTime}",
                  "envType": 1,
                  "column": [
                    "pk",
                    "colName",
                    "version",
                    "colValue",
                    "opType",
                    "sequenceInfo"
                  ],
                  "maxRetries": 30,
                  "isExportSequenceInfo": false
                },
                "name": "Reader",
                "category": "reader"
              },
              {
                "stepType": "oss",
                "parameter": {
                  "datasource": "MyOSSDatasource",
                  "fileFormat": "csv",
                  "object": "tablestore/incrementData/tablestoreData.csv",
                  "fieldDelimiter": ",",
                  "encoding": "UTF-8",
                  "nullFormat": "null",
                  "dateFormat": "yyyy-MM-dd HH:mm:ss",
                  "writeMode": "append",
                  "writeSingleObject": true,
                  "envType": 1
                },
                "name": "Writer",
                "category": "writer"
              },
              {
                "copies": 1,
                "parameter": {
                  "nodes": [],
                  "edges": [],
                  "groups": [],
                  "version": "2.0"
                },
                "name": "Processor",
                "category": "processor"
              }
            ],
            "setting": {
              "errorLimit": {
                "record": "0"
              },
              "locale": "zh",
              "speed": {
                "concurrent": 3,
                "throttle": false
              }
            },
            "order": {
              "hops": [
                {
                  "from": "Reader",
                  "to": "Writer"
                }
              ]
            }
          }
          • Tablestore Reader需要替换的参数说明如下:

            参数名称

            说明

            datasource

            Tablestore数据源名称。

            dataTable

            源表名称。

            column

            源表主键和增量变更信息。

          • OSS Writer需要替换的参数说明如下:

            参数名称

            说明

            datasource

            OSS数据源名称。

            object

            OSS Writer写入的文件名(含路径)。

      • 同步时序表数据

        脚本配置示例与参数说明

        {
          "type": "job",
          "version": "2.0",
          "steps": [
            {
              "stepType": "otsstream",
              "parameter": {
                "datasource": "MyTablestoreDatasource",
                "dataTable": "source_table",
                "statusTable": "TableStoreStreamReaderStatusTable",
                "newVersion": "true",
                "isTimeseriesTable":"true",
                "mode": "single_version_and_update_only",
                "startTimeString": "${startTime}",
                "endTimeString": "${endTime}",
                "envType": 1,
                "column": [
                  {
                    "name": "_m_name"
                  },
                  {
                    "name": "_data_source"
                  },
                  {
                    "name": "_tags"
                  },
                  {
                    "name": "_time"
                  },
                  {
                    "name": "col_string",
                    "type": "STRING"
                  },
                  {
                    "name": "col_long",
                    "type": "INT"
                  },
                  {
                    "name": "col_double",
                    "type": "DOUBLE"
                  },
                  {
                    "name": "col_bool",
                    "type": "BOOL"
                  },
                  {
                    "name": "col_binary",
                    "type": "BINARY"
                  }
                ],
                "maxRetries": 30,
                "isExportSequenceInfo": false
              },
              "name": "Reader",
              "category": "reader"
            },
            {
              "stepType": "oss",
              "parameter": {
                "datasource": "MyOSSDatasource",
                "fileFormat": "csv",
                "object": "tablestore/incrementData/tablestoreData.csv",
                "fieldDelimiter": ",",
                "encoding": "UTF-8",
                "nullFormat": "null",
                "dateFormat": "yyyy-MM-dd HH:mm:ss",
                "writeMode": "append",
                "writeSingleObject": true,
                "envType": 1
              },
              "name": "Writer",
              "category": "writer"
            },
            {
              "name": "Processor",
              "stepType": null,
              "category": "processor",
              "copies": 1,
              "parameter": {
                "nodes": [],
                "edges": [],
                "groups": [],
                "version": "2.0"
              }
            }
          ],
          "setting": {
            "executeMode": null,
            "errorLimit": {
              "record": "0"
            },
            "speed": {
              "concurrent": 3,
              "throttle": false
            }
          },
          "order": {
            "hops": [
              {
                "from": "Reader",
                "to": "Writer"
              }
            ]
          }
        }
        • Tablestore Reader需要替换的参数说明如下:

          参数名称

          说明

          datasource

          Tablestore数据源名称。

          dataTable

          源表名称。

          column

          读取源表的列信息。

        • OSS数据源需要替换的参数说明如下:

          参数名称

          说明

          datasource

          OSS数据源名称。

          object

          OSS Writer写入的文件名(含路径)。

    3. 单击保存,保存配置。

三、配置调度属性

  1. 单击任务右侧的调度配置

  2. 调度配置面板的调度参数部分,单击添加参数,根据下表说明新增参数。更多信息,请参见调度参数支持格式

    参数

    参数值

    startTime

    $[yyyymmddhh24-2/24]$[miss-10/24/60]

    endTime

    $[yyyymmddhh24-1/24]$[miss-10/24/60]

    配置示例如下图所示。

    image

    假如任务运行时的时间为2023042319:00:00,startTime20230423175000,endTime20230423185000。任务将会同步在17:5018:50时段内新增的数据。

  3. 调度策略部分,配置调度策略。更多信息,请参见实例生成方式:发布后即时生成

    image

  4. 调度时间部分,配置调度时间。更多信息,请参见调度时间

    此处以任务整点每小时自动运行为例,如下图所示。

    image

  5. 调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。

    说明

    使用工作空间根节点表示该任务无上游的依赖任务。

    image

  6. 配置完成后,关闭调度配置面板。

  7. 单击保存,保存配置。

四、(可选)调试脚本代码

通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到OSS中。

说明

调试脚本代码时配置的时间范围内的数据可能会被多次写入到OSS文件中。

  1. 单击任务右侧的调试配置,选择运行的资源组,并配置脚本参数。

    自定义参数的格式为yyyyMMddHHmmss,例如20250528160000。

    image

  2. 配置完成后,关闭调试配置面板。

  3. 单击运行

    任务运行完成后,您可以在OSS管理控制台下载写入文件以查看同步的数据。

五、发布同步任务

发布同步任务后,该同步任务将根据配置的调度属性运行。

  1. 单击发布

  2. 在同步任务的发布页签,根据需要输入发布描述,然后单击开始发布生产

  3. 根据发布流程引导,单击确认发布

步骤四:查看同步结果

  1. DataWorks控制台查看任务运行状态。

    数据开发(Data Studio)旧版

    1. 单击同步任务工具栏右侧的运维

    2. 周期实例页面的实例视角页签,查看实例的运行详情。更多信息,请参见周期实例视角

    数据开发(Data Studio)新版

    1. 单击前往运维查看周期实例

    2. 周期实例页面的实例视角页签,查看实例的运行详情。更多信息,请参见周期实例视角

  2. 查看同步数据。

    1. 登录OSS管理控制台

    2. 在左侧导航栏,单击Bucket列表

    3. Bucket列表页面,找到目标Bucket后,单击Bucket名称。

    4. 文件列表页签,选择相应文件,下载后可查看内容是否符合预期。

常见问题

OTSStreamReader常见问题

相关文档

简单下载OSS文件