使用DataWorks工具将表格存储(Tablestore)时序表中的全量数据或者增量数据同步到另一个时序表。
背景信息
表格存储支持对时序数据进行存储与查询,在某些场景下,您可能希望将时序表中的时序数据迁移到另一个时序表中。
关于表格存储时序模型的更多信息,请参见时序模型概述。
具体数据迁移场景如下:
大数据开发治理平台DataWorks数据集成是稳定高效、弹性伸缩的数据同步平台,致力于提供复杂网络环境下丰富的异构数据源之间高速稳定的数据移动及同步能力。您可以在数据开发(DataStudio)界面直接创建离线同步节点用于离线(批量)数据周期性同步或者创建实时同步节点用于单表或整库增量数据实时同步。更多信息,请参见数据集成概述。
使用DataWorks数据集成的离线同步任务,您可以将表格存储(Tablestore)时序表中的全量数据或者增量数据同步到另一个时序表中。
前提条件
已在实例详情页面获取实例的服务地址(Endpoint)。
在概览页面,单击实例名称后,在实例详情页签的实例访问地址区域,即可查看实例的Endpoint,请根据实际选择。
已创建目标时序表,用于存放迁移数据。具体操作,请参见创建时序表。
已创建RAM用户,并授予RAM用户管理表格存储服务的权限(AliyunOTSFullAccess)。具体操作,请参见配置RAM用户权限。
重要由于配置数据同步任务时需要填写访问密钥AccessKey(AK)信息来执行授权,为避免阿里云账号泄露AccessKey带来的安全风险,建议您通过RAM用户来完成授权和AccessKey的创建。
已获取RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret),用于进行签名认证。具体操作,请参见获取AccessKey。
已开通DataWorks服务并创建工作空间。具体操作,请参见开通DataWorks服务和创建工作空间。
已授予RAM用户DataWorks工作空间的开发角色权限,用于在数据开发(DataStudio)界面创建同步任务。具体操作,请参见添加空间成员并管理成员角色权限。
全量数据同步
步骤一:新增数据源
具体操作,请参见步骤一:新增表格存储数据源。
如果源时序表和目标时序表所在实例相同,则只需要创建一个数据源即可。
如果源时序表和目标时序表所在实例不同,则需要分别创建源数据源和目标数据源。
步骤二:新建离线同步任务
进入数据开发页面。
以项目管理员身份登录DataWorks控制台。
选择地域,在左侧导航栏,单击工作空间列表。
在工作空间列表页面,在目标工作空间操作列选择快速进入>数据开发。
在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。
如果需要新建业务流程,请参见创建业务流程。
在数据集成节点上右键选择新建节点 > 离线同步。
在新建节点对话框,选择路径并填写节点名称。
单击确认。
在数据集成节点下会显示新建的离线同步节点。
步骤三:配置离线同步任务并启动
在数据集成节点下,双击打开新建的离线同步任务节点。
配置同步网络链接。
选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。
重要数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。
在网络与资源配置步骤,选择数据来源为Tablestore,并选择数据源名称为源数据源。
选择资源组。
选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。
重要请与新增数据源时选择的资源组保持一致。
选择数据去向为Tablestore,并选择数据源名称为目标数据源。
系统会自动测试资源组与所选数据源之间连通性。
测试可连通后,单击下一步。
在提示对话框,单击确认使用脚本模式。
重要表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。
任务转为脚本模式后,将无法转为向导模式。
配置任务并保存。
全量数据的同步需要使用到Tablestore(OTS) Reader与Tablestore Writer插件。脚本配置规则请参见Tablestore数据源。
在配置任务步骤,根据配置示例编辑脚本。
配置示例如下:
重要为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "ots", // 读数据使用的插件为Tablestore Reader。 "parameter": { "datasource": "instance01", // 已增加的Tablestore源数据源。 "table": "timeseriesReadTable", // 源时序表名称。 "newVersion": "true", // 使用Tablestore Reader的新版本。 "mode": "normal", // 使用Tablestore Reader的普通模式。 "isTimeseriesTable": "true", // 表示该表为时序表。 "envType": 1, "column": [ { "name": "_m_name" // 导出时序表的度量名称字段。 }, { "name": "_data_source" // 导出时序表的数据源字段。 }, { "name": "_tags" // 导出时序表的标签字段。 }, { "name": "_time" // 导出时序表的时间戳字段。 }, // 以下配置请根据时序表中的具体数据为准。 本示例中从源时序表中导出列名分别为string_1(string类型),bool_1(bool类型),int_1(int整型),double_1(double类型)的四列。 { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Reader", // 表示以上配置为reader读数据的配置。 "category": "reader" }, { "stepType": "ots", // 写数据使用的插件为Tablestore Writer。 "parameter": { "datasource": "instance02", // 已增加的Tablestore目标数据源。 "table": "timeseriesWriteTable", // 目标时序表名称。 "newVersion": "true", // 使用Tablestore Writer的新版本。 "mode": "normal", // 使用Tablestore Writer的普通模式。 "isTimeseriesTable": "true", // 表示该表为时序表。 "envType": 1, "column": [ { "name": "_m_name" // 导入时序表的度量名称字段。 }, { "name": "_data_source" // 导入时序表的数据源字段。 }, { "name": "_tags" // 导入时序表的标签字段。 }, { "name": "_time" // 导入时序表的时间戳字段。 }, // 导入Tablestore Reader导出的四列字段,导入字段与导出字段的顺序必须一一对应。 { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Writer", // 表示以上配置为writer写数据的配置。 "category": "writer" } ], "setting": { "errorLimit": { "record": "" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 // 同步任务的并发度。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
按【Ctrl+S】保存脚本。
说明执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。
执行同步任务。
说明全量数据一般只需要同步一次,无需配置调度属性。
单击图标。
在参数对话框,选择运行资源组的名称。
单击运行。
步骤四:查看任务执行结果
执行同步任务后,在DataWorks控制台通过日志查看任务执行状态,在表格存储控制台查看目标时序表数据同步结果。
在DataWorks控制台通过日志查看同步任务运行状态。
在同步任务的运行日志页签,单击Detail log url对应的链接。
在任务的详细运行日志页面,查看
Current task status
对应的状态。当
Current task status
的值为FINISH时,表示任务运行完成。
在表格存储控制台查看目标时序表数据同步结果。
登录表格存储控制台。
在概览页面上方,选择地域。
单击实例名称。
在实例详情页签,单击时序表列表页签。
在时序表列表页签,单击目标时序表操作列的数据管理。
在数据管理页签,即可查看同步到该时序表中的数据。
增量数据同步
步骤一:新建离线同步任务
进入数据开发页面。
以项目管理员身份登录DataWorks控制台。
选择地域,在左侧导航栏,单击工作空间列表。
在工作空间列表页面,在目标工作空间操作列选择快速进入>数据开发。
在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。
如果需要新建业务流程,请参见创建业务流程。
在数据集成节点上右键选择新建节点 > 离线同步。
在新建节点对话框,选择路径并填写节点名称。
单击确认。
在数据集成节点下会显示新建的离线同步节点。
步骤二:配置离线同步任务并启动
双击打开新建的离线同步任务节点。
配置同步网络链接。
选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。
重要数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。
在网络与资源配置步骤,选择数据来源为Tablestore Stream,并选择数据源名称为源数据源。
选择资源组。
选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。
重要请与新增数据源时选择的资源组保持一致。
选择数据去向为Tablestore,并选择数据源名称为目标数据源。
系统会自动测试资源组与所选数据源之间连通性。
测试可连通后,单击下一步。
在提示对话框,单击确认使用脚本模式。
重要表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。
任务转为脚本模式后,将无法转为向导模式,
配置同步任务并保存。
增量数据的同步需要使用到Tablestore Stream Reader与Tablestore Writer插件。脚本配置规则请参见Tablestore Stream数据源和Tablestore数据源。
在配置任务步骤,根据配置示例编辑脚本。
配置示例如下:
重要为了便于理解,在配置示例中增加了注释内容,实际使用脚本时请删除所有注释内容。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", // 读数据使用的插件为Tablestore Stream Reader。 "parameter": { "datasource": "instance01", // 已增加的Tablestore源数据源。 // 源时序表名称。 "dataTable": "timeseriesReadTable", // 用于记录状态的表的名称。如果该表不存在,则Tablestore Stream Reader会自动创建。 "statusTable": "TableStoreStreamReaderStatusTable", "maxRetries": 30, // 最大重试次数。 "isExportSequenceInfo": false, // 不导出时序信息。 // 模式为行模式(single_version_and_update_only)。 "mode": "single_version_and_update_only", "isTimeseriesTable": "true", // 表示该表为时序表。 // 此处的${}用于注入参数,参数会在下一步配置。 "startTimeString": "${startTime}", // 增量数据的时间范围(左闭右开)的左边界。 "endTimeString": "${endTime}", // 增量数据的时间范围(左闭右开)的右边界。 "envType": 1, "column": [ { "name": "_m_name" }, { "name": "_data_source" }, { "name": "_tags" }, { "name": "_time" }, { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Reader", "category": "reader" }, { "stepType": "ots", // 写数据使用的插件为Tablestore Writer。 "parameter": { "datasource": "instance02", // 已增加的Tablestore目标数据源。 "table": "timeseriesWriteTable", // 目标时序表名称。 "newVersion": "true", // 使用Tablestore Writer的新版本。 "mode": "normal", // 使用Tablestore Writer的普通模式。 "isTimeseriesTable": "true", // 表示该表为时序表。 "envType": 1, "column": [ { "name": "_m_name" // 导入时序表的度量名称字段。 }, { "name": "_data_source" // 导入时序表的数据源字段。 }, { "name": "_tags" // 导入时序表的标签字段。 }, { "name": "_time" // 导入时序表的时间戳字段。 }, // 导入Tablestore Steam Reader导出的四列字段。导入字段与导出字段的顺序必须一一对应。 { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Writer", // 表示以上配置为writer写数据的配置。 "category": "writer" } ], "setting": { "errorLimit": { "record": "" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 // 同步任务的并发度。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
按【Ctrl+S】保存脚本。
说明执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。
配置调度属性。
单击任务右侧的调度配置。
在配置调度面板的参数部分,单击新增参数,根据下表说明新增参数。更多信息,请参见调度参数支持的格式。
参数
参数值
startTime
$[yyyymmddhh24-2/24]$[miss-10/24/60]
endTime
$[yyyymmddhh24-1/24]$[miss-10/24/60]
配置示例如下图所示。
假如任务运行时的时间为2023年04月23日19:00:00点,则startTime为20230423175000,endTime为20230423185000。任务将会同步17:50到18:50时段内新增的数据。
在时间属性部分,配置时间属性。更多信息,请参见时间属性配置说明。
此处以任务整点每小时自动运行为例介绍配置,如下图所示。
在调度依赖部分,单击使用工作空间根节点,系统会自动生成依赖的上游节点信息。
使用工作空间根节点表示该任务无上游的依赖任务。
配置完成后,关闭配置调度面板。
(可选)根据需要调试脚本代码。
通过调试脚本代码,确保同步任务能成功同步源时序表的增量数据到目标时序表中。
重要调试脚本代码时配置的时间范围内的数据可能会多次导入到目标时序表,相同时间线会覆盖写入到目标时序表。
单击图标。
在参数对话框,选择运行资源组的名称,并配置自定义参数。
自定义参数的格式为
yyyyMMddHHmmss
,例如20230423175000。单击运行。
提交同步任务。
提交同步任务后,同步任务会按照配置的调度属性进行运行。
单击图标。
在提交对话框,根据需要填写变更描述。
单击确认。
步骤三:查看任务执行结果
在DataWorks控制台查看任务运行状态。
单击同步任务工具栏右侧的运维。
在周期实例页面的实例视角页签,查看实例的运行状态。
在表格存储控制台查看目标时序表数据同步结果。
登录表格存储控制台。
在概览页面上方,选择地域。
单击实例名称。
在实例详情页签,单击时序表列表页签。
在时序表列表页签,单击目标时序表操作列的数据管理。
在数据管理页签,即可查看同步到该时序表中的数据。