如果需要定期将表格存储(Tablestore)中的新增或变更数据同步至MaxCompute进行备份或业务处理,您可以通过在DataWorks数据集成平台上配置离线同步任务,实现周期性增量同步功能。
前提条件
已获取Tablestore源表的实例名称、实例访问地址、地域ID等信息,并为源表开启Stream信息。
创建表的SQL语句,您可参见附录:MaxCompute表创建示例。
已为阿里云账号或RAM用户(具备Tablestore与MaxCompute服务的权限)创建AccessKey。
已开通DataWorks服务,并在MaxCompute实例或Tablestore实例所在地域创建工作空间。
已创建Serverless资源组并绑定到工作空间。有关计费信息,请参见Serverless资源组计费。
如果MaxCompute实例和Tablestore实例不在同一地域,请参考以下操作步骤创建VPC对等连接实现跨地域网络连通。
操作步骤
步骤一:新增表格存储数据源
进入数据集成页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据集成。
在左侧导航栏,单击数据源。
在数据源列表页面,单击新增数据源。
在新增数据源对话框,搜索并选择数据源类型为Tablestore。
在新增OTS数据源对话框,根据下表配置数据源参数。
参数
说明
数据源名称
数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。
数据源描述
对数据源进行简单描述,不得超过80个字符。
地域
选择Tablestore实例所属地域。
Table Store实例名称
Tablestore实例的名称。
Endpoint
Tablestore实例的服务地址,推荐使用VPC地址。
AccessKey ID
阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。
AccessKey Secret
测试资源组连通性。创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。
在连接配置区域,单击相应资源组连通状态列的测试连通性。
测试连通性通过后,连通状态显示可连通,单击完成。您可以在数据源列表中查看新建的数据源。
说明如果测试连通性结果为无法通过,您可使用连通性诊断工具自助解决。如仍无法连通资源组与数据源,请提交工单处理。
步骤二:新增MaxCompute数据源
操作与步骤一相似,在新增数据源对话框中搜索并选择数据源类型为MaxCompute,随后配置相关的数据源参数。
步骤三:配置离线同步任务
数据开发(Data Studio)旧版
一、新建任务节点
进入数据开发页面。
登录DataWorks控制台。
在页面上方,选择资源组和地域。
在左侧导航栏,单击
。在数据开发页面的下拉框中,选择对应工作空间后单击进入数据开发。
在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。
如果需要新建业务流程,请参见创建业务流程。
在数据集成节点上右键单击,然后选择新建节点 > 离线同步。
在新建节点对话框,选择路径并填写名称,然后单击确认。
在数据集成节点下,将显示新建的离线同步节点。
二、配置同步任务
在数据集成节点下,双击打开新建的离线同步任务节点。
配置网络与资源。
选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。
在网络与资源配置步骤,选择数据来源为Tablestore Stream,并选择数据源名称为新增的表格存储数据源。
选择资源组。
选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。
说明Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。
选择数据去向为MaxCompute(ODPS),并选择数据源名称为新增的MaxCompute数据源。
系统会自动测试资源组与所选数据源之间连通性。
测试可连通后,单击下一步。
配置任务并保存。
向导模式
在配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。
数据来源
参数
说明
数据源
默认显示上一步选择的Tablestore数据源。
表
源数据表。
开始时间
增量读取数据的开始时间和结束时间,分别配置为变量形式
${startTime}
和${endTime}
,具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。结束时间
状态表
用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。
最大重试次数
从TableStore中读取增量数据时,每次请求的最大重试次数。
导出时序信息
是否导出时序信息,时序信息包含了数据的写入时间等。
重要请勾选导出时序信息的单选框,在后续将增量数据转换为全量数据格式时,需要使用时序信息。
数据去向
参数
说明
数据源
默认显示上一步选择的MaxCompute数据源。
Tunnel资源组
即Tunnel Quota,默认值为公共传输资源,即MC的免费quota。
MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组。
说明如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。
表
目标MaxCompute表。
分区信息
如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate}。
写入模式
数据写入表中的模式,支持以下两种写入模式:
写入前保留已有数据(Insert Into):直接向表或静态分区中插入数据。
重要后续将周期性执行离线同步任务,请选择写入前保留已有数据(Insert Into)。
写入前清理已有数据(Insert Overwrite):先清空表中的原有数据,再向表或静态分区中插入数据。
空字符串转为Null写入
如果源头数据为空字符串,在向目标MaxCompute列写入时是否转为Null值写入。默认值为否。
同步完成才可见
单击高级配置后才会显示该参数。
同步到MaxCompute中的数据是否在同步完成后才能被查询到。默认值为否。
配置字段映射。
在字段映射区域,系统自动进行同名映射,保持默认即可。更多信息,请参见配置字段映射关系。
配置通道控制。
您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系。
单击
图标,保存配置。
脚本模式
在配置任务步骤,单击
图标,然后在弹出的对话框中单击确定。
在脚本配置页面,编辑脚本。
脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", "parameter": { "statusTable": "TableStoreStreamReaderStatusTable", "maxRetries": 30, "isExportSequenceInfo": true, "datasource": "MyTablestoreDataSource", "dataTable": "target_table", "newVersion": "true", "column": [ "pk", "colName", "version", "colValue", "opType", "sequenceInfo" ], "startTimeString": "${startTime}", "endTimeString": "${endTime}" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": false, "datasource": "MyMaxComputeDataSource", "table": "target_table", "column": [ "pk", "colname", "version", "colvalue", "optype", "sequenceinfo" ], "emptyAsNull": false, "compress": false }, "name": "Writer", "category": "writer" } ], "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "0" }, "speed": { "throttle": false, "concurrent": 2 } } }
Tablestore Stream Reader需要替换的参数说明如下:
参数名称
说明
datasource
源表的Tablestore数据源名称。
dataTable
源表名称。
column
源表主键和增量变更信息。
MaxCompute Writer需要替换的参数说明如下:
参数名称
说明
datasource
目标表的MaxCompute数据源名称。
table
目标表名称。
column
需要写入的属性列。
单击
图标,保存配置。
三、配置调度属性
单击任务右侧的调度配置。
在调度配置面板的调度参数部分,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式。
参数
参数值
startTime
$[yyyymmddhh24-2/24]$[miss-10/24/60]
endTime
$[yyyymmddhh24-1/24]$[miss-10/24/60]
配置示例如下图所示。
假如任务运行时的时间为2023年04月23日19:00:00,startTime为20230423175000,endTime为20230423185000。任务将会同步在17:50到18:50时段内新增的数据。
在时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明。
此处以任务整点每小时自动运行为例介绍配置,如下图所示。
在调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。
使用工作空间根节点表示该任务无上游的依赖任务。
配置完成后,关闭配置调度面板。
单击
图标,保存配置。
四、(可选)调试脚本代码
通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到MaxCompute中。
调试脚本代码时配置的时间范围内的数据可能会多次导入到MaxCompute中,相同数据行会覆盖写入到MaxCompute中。
单击
图标。
在参数对话框,选择运行资源组的名称,并配置自定义参数。
自定义参数的格式为
yyyyMMddHHmmss
,例如20250631170000。单击运行。
任务运行完成后,您可以查看目标表的同步结果。更多信息,请参见MaxCompute表数据。
五、发布同步任务
提交同步任务后,同步任务会按照配置的调度属性运行。
单击
图标。
在提交对话框,根据需要填写变更描述。
单击确认。
数据开发(Data Studio)新版
一、新建任务节点
进入数据开发页面。
登录DataWorks控制台。
在页面上方,选择资源组和地域。
在左侧导航栏,单击
。在数据开发页面的下拉框中,选择对应工作空间后单击进入Data Studio。
在DataStudio控制台的数据开发页面,单击项目目录右侧的
图标,然后选择 。
说明首次使用项目目录时,也可以直接单击新建节点按钮。
在新建节点对话框,选择路径并填写名称,然后单击确认。
在项目目录下,将显示新建的离线同步节点。
二、配置同步任务
在项目目录下,单击打开新建的离线同步任务节点。
配置网络与资源。
选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。
在网络与资源配置步骤,选择数据来源为Tablestore Stream,并选择数据源名称为新增的表格存储数据源。
选择资源组。
选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。
说明Serverless资源组支持为同步任务指定运行CU上限,如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。
选择数据去向为MaxCompute(ODPS),并选择数据源名称为新增的MaxCompute数据源。
系统会自动测试资源组与所选数据源之间连通性。
测试可连通后,单击下一步。
配置任务并保存。
向导模式
在配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。
数据来源
参数
说明
数据源
默认显示上一步选择的Tablestore数据源。
表
源数据表。
开始时间
增量读取数据的开始时间和结束时间,分别配置为变量形式
${startTime}
和${endTime}
,具体格式在后续调度属性中配置。增量数据的时间范围为左闭右开的区间。结束时间
状态表
用于记录状态的表名称,默认值为TableStoreStreamReaderStatusTable。
最大重试次数
从TableStore中读取增量数据时,每次请求的最大重试次数。
导出时序信息
是否导出时序信息,时序信息包含了数据的写入时间等。
重要请勾选导出时序信息的单选框,在后续将增量数据转换为全量数据格式时,需要使用时序信息。
数据去向
参数
说明
数据源
默认显示上一步选择的MaxCompute数据源。
Tunnel资源组
即Tunnel Quota,默认值为公共传输资源,即MC的免费quota。
MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组。
说明如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。
表
目标MaxCompute表。
分区信息
如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate}。
写入模式
数据写入表中的模式,支持以下两种写入模式:
写入前保留已有数据(Insert Into):直接向表或静态分区中插入数据。
重要后续将周期性执行离线同步任务,请选择写入前保留已有数据(Insert Into)。
写入前清理已有数据(Insert Overwrite):先清空表中的原有数据,再向表或静态分区中插入数据。
空字符串转为Null写入
如果源头数据为空字符串,在向目标MaxCompute列写入时是否转为Null值写入。默认值为否。
同步完成才可见
单击高级配置后才会显示该参数。
同步到MaxCompute中的数据是否在同步完成后才能被查询到。默认值为否。
配置字段映射。
在字段映射区域,系统自动进行同名映射,保持默认即可。更多信息,请参见配置字段映射关系。
配置通道控制。
您可以通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系。
单击保存,保存配置。
脚本模式
在配置任务步骤,单击脚本模式,然后在弹出的对话框中单击确定。
在脚本配置页面,编辑脚本。
脚本配置示例如下,请根据您的同步信息和需求替换配置文件内的参数信息。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", "parameter": { "statusTable": "TableStoreStreamReaderStatusTable", "maxRetries": 30, "isExportSequenceInfo": true, "datasource": "MyTablestoreDataSource", "dataTable": "target_table", "newVersion": "true", "column": [ "pk", "colName", "version", "colValue", "opType", "sequenceInfo" ], "startTimeString": "${startTime}", "endTimeString": "${endTime}" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": false, "datasource": "MyMaxComputeDataSource", "table": "target_table", "column": [ "pk", "colname", "version", "colvalue", "optype", "sequenceinfo" ], "emptyAsNull": false, "compress": false }, "name": "Writer", "category": "writer" } ], "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "0" }, "speed": { "throttle": false, "concurrent": 2 } } }
Tablestore Stream Reader需要替换的参数说明如下:
参数名称
说明
datasource
源表的Tablestore数据源名称。
dataTable
源表名称。
column
源表主键和增量变更信息。
MaxCompute Writer需要替换的参数说明如下:
参数名称
说明
datasource
目标表的MaxCompute数据源名称。
table
目标表名称。
column
需要写入的属性列。
单击保存,保存配置。
三、配置调度属性
单击任务右侧的调度配置。
在调度配置面板的调度参数部分,单击添加参数,根据下表说明新增参数。更多信息,请参见调度参数支持格式。
参数
参数值
startTime
$[yyyymmddhh24-2/24]$[miss-10/24/60]
endTime
$[yyyymmddhh24-1/24]$[miss-10/24/60]
配置示例如下图所示。
假如任务运行时的时间为2023年04月23日19:00:00,startTime为20230423175000,endTime为20230423185000。任务将会同步在17:50到18:50时段内新增的数据。
在调度策略部分,配置调度策略。更多信息,请参见实例生成方式:发布后即时生成。
在调度时间部分,配置调度时间。更多信息,请参见调度时间。
此处以任务整点每小时自动运行为例,如下图所示。
在调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。
说明使用工作空间根节点表示该任务无上游的依赖任务。
配置完成后,关闭调度配置面板。
单击保存,保存配置。
四、(可选)调试脚本代码
通过调试脚本代码,确保同步任务能成功同步表格存储的增量数据到MaxCompute中。
调试脚本代码时配置的时间范围内的数据可能会多次导入到MaxCompute中,相同数据行会覆盖写入到MaxCompute中。
单击任务右侧的调试配置,选择运行的资源组,并配置脚本参数。
自定义参数的格式为
yyyyMMddHHmmss
,例如20250528160000。配置完成后,关闭调试配置面板。
单击运行。
任务运行完成后,您可以查看目标表的同步结果。更多信息,请参见MaxCompute表数据。
五、发布同步任务
发布同步任务后,该同步任务将根据配置的调度属性运行。
单击发布。
在同步任务的发布页签,根据需要输入发布描述,然后单击开始发布生产。
根据发布流程引导,单击确认发布。
步骤四:查看同步结果
在DataWorks控制台查看任务运行状态。
数据开发(Data Studio)旧版
单击同步任务工具栏右侧的运维。
在周期实例页面的实例视角页签,查看实例的运行详情。更多信息,请参见周期实例视角。
数据开发(Data Studio)新版
单击前往运维查看周期实例。
在周期实例页面的实例视角页签,查看实例的运行详情。更多信息,请参见周期实例视角。
查看目标表的同步结果。
在DataWorks控制台的数据地图模块,您可以查看目标MaxCompute表的详细信息。更多信息,请参见MaxCompute表数据。
后续操作
同步到MaxCompute的增量数据为Tablestore数据表的列数据变更记录,您可以根据业务需求将表格存储增量数据转换为全量数据格式
附录:MaxCompute表创建示例
以下示例用于创建目标MaxCompute表,以存储Tablestore数据表的列数据变更记录。
目标MaxCompute表的主键结构(包括名称、数据类型及顺序)必须与Tablestore源表一致。
CREATE TABLE IF NOT EXISTS maxcompute_target_table(
`pk` STRING COMMENT '主键字段', -- 以实际为准
`colname` STRING COMMENT '列名',
`version` BIGINT COMMENT '版本号',
`colvalue` STRING COMMENT '列值',
`optype` STRING COMMENT '操作类型',
`sequenceinfo` STRING COMMENT '时序信息'
)
COMMENT '存储Tablestore的增量数据变更记录';