通过在DataWorks数据集成平台上配置离线同步任务,可实现表格存储中新增和变化数据的周期性增量同步至OSS,满足数据备份和后续处理需求。
准备工作
获取Tablestore源表的实例名称、实例访问地址、地域ID等信息,并为源表开启Stream功能。
数据表可选择在创建时启用Stream功能,或通过修改操作启用;时序表默认已启用该功能。
为阿里云账号或RAM用户(具备Tablestore与OSS服务的权限)创建AccessKey。
开通DataWorks服务,并在OSS存储空间或Tablestore实例所在地域创建工作空间。
创建Serverless资源组并绑定到工作空间。有关计费信息,请参见Serverless资源组计费。
当DataWorks和Tablestore实例位于不同地域时,需要创建VPC对等连接实现跨地域网络连通。
操作步骤
步骤一:新增表格存储数据源
在DataWorks中配置表格存储数据源,建立与源数据的连接。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,在下拉框中选择对应工作空间后单击进入数据集成。
在左侧导航栏,单击数据源。
在数据源列表页面,单击新增数据源。
在新增数据源对话框,搜索并选择数据源类型为Tablestore。
在新增OTS数据源对话框,根据下表配置数据源参数。
参数
说明
数据源名称
数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。
数据源描述
对数据源进行简单描述,不得超过80个字符。
地域
选择Tablestore实例所属地域。
Table Store实例名称
Tablestore实例的名称。
Endpoint
Tablestore实例的服务地址,推荐使用VPC地址。
AccessKey ID
阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。
AccessKey Secret
测试资源组连通性。
创建数据源时,需要测试资源组的连通性,确保同步任务使用的资源组能够与数据源正常连通,否则将无法正常执行数据同步任务。
在连接配置区域,单击相应资源组连通状态列的测试连通性。
测试连通性通过后,连通状态显示可连通,单击完成。可在数据源列表中查看新建的数据源。
如果测试连通性结果为无法通过,可使用连通性诊断工具自助解决。
步骤二:新增OSS数据源
配置OSS数据源作为数据导出的目标存储。
再次单击新增数据源,在对话框中搜索并选择数据源类型为OSS,并配置相关的数据源参数。
参数
说明
数据源名称
数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。
数据源描述
对数据源进行简单描述,不得超过80个字符。
访问模式
RAM角色授权模式:DataWorks服务账号以角色扮演的方式访问数据源。首次选择该模式时,请根据页面提示开启授权。
Access Key模式:通过阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret访问数据源。
选择角色
仅当访问模式为RAM角色授权模式时需要选择RAM角色。
AccessKey ID
仅当访问模式为Access Key模式时需要填写。阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。
AccessKey Secret
地域
Bucket所在地域。
Endpoint
OSS访问域名,详见地域和Endpoint。
Bucket
Bucket名称。
完成参数和连通性测试后,单击完成添加数据源。
步骤三:配置离线同步任务
创建并配置数据同步任务,定义从表格存储到OSS的数据传输规则。
新建任务节点
进入数据开发页面。
登录DataWorks控制台。
在页面上方,选择资源组和地域。
在左侧导航栏,单击。
选择对应工作空间后单击进入Data Studio。
在Data Studio控制台的数据开发页面,单击项目目录右侧的
图标,然后选择。在新建节点对话框,选择路径,数据来源下拉选择Tablestore Stream,数据去向下拉选择OSS,填写名称,然后单击确认。
配置同步任务
在项目目录下,单击打开新建的离线同步任务节点,通过向导模式或脚本模式配置同步任务。
同步时序表时,只能使用脚本模式配置同步任务。
向导模式(默认)
配置以下内容:
数据源:选择来源数据源和去向数据源。
运行资源:选择资源组,选择后会自动检测数据源连通性。
数据来源:选择来源数据表,其它配置可保持默认,也可根据需求修改。
数据去向:选择目标文本类型,并配置相关参数。
文本类型:可选类型包括csv、text、orc和parquet。
文件名(含路径):OSS Bucket内包含路径的文件名称,如
tablestore/resouce_table.csv。列分隔符:默认为
,,如果分隔符不可见,请填写Unicode编码,比如\u001b、\u007c。文件路径:仅当文本类型为parquet时填写,OSS Bucket内的文件路径。
文件名:仅当文本类型为parquet时填写,OSS Bucket内的文件名。
去向字段映射:根据源表主键和增量变更信息自动配置,可根据需求修改。
配置完成后,单击页面上方的保存。
脚本模式
单击页面上方的脚本模式,在切换后的页面中编辑脚本。
数据表
以下示例配置以目标文件类型为csv为例,来源数据表主键包含1个int类型的主键列id和1个string类型的主键列name。配置时请替换示例脚本内的数据源datasource、表名称table以及写入文件名object。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "otsstream",
"parameter": {
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 31,
"isExportSequenceInfo": false,
"datasource": "source_data",
"column": [
"id",
"name",
"colName",
"version",
"colValue",
"opType",
"sequenceInfo"
],
"startTimeString": "${startTime}",
"table": "source_table",
"endTimeString": "${endTime}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "oss",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"datasource": "target_data",
"writeSingleObject": false,
"column": [
"0",
"1",
"2",
"3",
"4",
"5",
"6"
],
"writeMode": "truncate",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileFormat": "csv",
"object": "tablestore/source_table.csv"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}时序表
以下示例配置以目标文件类型为csv为例,来源时序表的时间线数据包含1个int类型的属性列value。配置时请替换示例脚本内的数据源datasource、表名称table以及写入文件名object。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "otsstream",
"parameter": {
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 31,
"isExportSequenceInfo": false,
"datasource": "source_data",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source"
},
{
"name": "_tags"
},
{
"name": "_time"
},
{
"name": "value",
"type": "int"
}
],
"startTimeString": "${startTime}",
"table": "source_series",
"isTimeseriesTable":"true",
"mode": "single_version_and_update_only",
"endTimeString": "${endTime}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "oss",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"datasource": "target_data",
"writeSingleObject": false,
"writeMode": "truncate",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileFormat": "csv",
"object": "tablestore/source_series.csv"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}脚本编辑完成后,单击页面上方的保存。
调试同步任务
单击页面右侧的调试配置,选择运行的资源组,并添加运行的脚本参数。
startTime:同步增量数据的开始时间(包含),如
20251119200000。endTime:同步增量数据的结束时间(不包含),如
20251119205000。增量同步采用周期调度机制,每隔5分钟进行一次调度,且插件存在5分钟的延迟,因此同步的总延迟为5~10分钟,配置结束时间时避免配置当前时间往前10分钟的时间段。
单击页面上方的运行,开始同步任务。
上述示例值表示同步
2025年11月19日20点到20点50分(不包含)的增量数据。
步骤四:查看同步结果
运行同步任务后,可通过日志查看任务的执行状态,并在OSS Bucket查看同步结果文件。
在页面下方查看任务运行状态和结果,出现以下信息时表示同步任务运行成功。
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208s查看目标Bucket的文件。
前往Bucket列表,单击目标Bucket,查看或下载同步结果文件。
应用于生产环境
调试完成后,可在页面右侧调度配置中配置调度参数startTime、endTime和周期调度策略,并发布到生产环境。详细的配置规则请参见配置并使用调度参数、调度策略和调度时间。
