DataWorks数据集成支持使用Tablestore Stream Reader读取Tablestore的增量数据,本文为您介绍DataWorks的Tablestore Stream数据读取能力。
数据同步前准备:Tablestore Stream环境准备
使用Tablestore Stream插件前,您必须确保Tablestore表上已经开启Stream功能。时序表已默认开启stream功能。您可以在建表时指定开启,也可以使用SDK的UpdateTable接口开启。开启Stream的方法,如下所示。
SyncClient client = new SyncClient("", "", "", "");
#方法1:建表的时候开启:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时。
client.createTable(createTableRequest);
#方法2:如果建表时未开启,您可以通过UpdateTable开启:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);
使用SDK的UpdateTable接口开启时:
指定开启Stream并设置过期时间,即开启了Tablestore增量数据导出功能。开启stream功能后,Tablestore服务端就会将您的操作日志额外保存起来,每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,该时间即为您指定的过期时间。
Tablestore的SDK提供了几个Stream相关的API用于读取这部分的操作日志,增量插件也是通过Tablestore SDK的接口获取到增量数据。列模式下会将增量数据转化为多个6元组的形式(pk、colName、version、colValue、opType和sequenceInfo),行模式则会以普通行的形式导出增量数据。
支持的同步模式与字段类型
Tablestore Stream Reader插件支持使用列模式或行模式同步Tablestore的增量数据。两种模式下的同步过程和字段类型要求如下。
列模式
在Tablestore多版本模式下,表中的数据组织为行>列>版本
三级的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。
您可以通过Tablestore的API进行一系列读写操作,Tablestore通过记录您最近对表的一系列写操作(或数据更改操作)来实现记录增量数据的目的,所以您也可以把增量数据看作一批操作记录。
Tablestore支持PutRow、UpdateRow和DeleteRow操作:
PutRow:写入一行,如果该行已存在即覆盖该行。
UpdateRow:更新一行,不更改原行的其它数据。更新包括新增或覆盖(如果对应列的对应版本已存在)一些列值、删除某一列的全部版本、删除某一列的某个版本。
DeleteRow:删除一行。
Tablestore会根据每种操作生成对应的增量数据记录,Reader插件会读出这些记录,并导出为数据集成的数据格式。
同时,由于Tablestore具有动态列、多版本的特性,所以Reader插件导出的一行不对应Tablestore中的一行,而是对应Tablestore中的一列的一个版本。即Tablestore中的一行可能会导出很多行,每行包含主键值、该列的列名、该列下该版本的时间戳(版本号)、该版本的值、操作类型。如果设置isExportSequenceInfo为true,还会包括时序信息。
转换为数据集成的数据格式后,定义了以下四种操作类型:
U(UPDATE):写入一列的一个版本。
DO(DELETE_ONE_VERSION):删除某一列的某个版本。
DA(DELETE_ALL_VERSION):删除某一列的全部版本,此时需要根据主键和列名,删除对应列的全部版本。
DR(DELETE_ROW):删除某一行,此时需要根据主键,删除该行数据。
假设该表有两个主键列,主键列名分别为pkName1, pkName2,示例如下。
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | — | DO |
pk1_V2 | pk2_V2 | col_b | — | — | DA |
pk1_V3 | pk2_V3 | — | — | — | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
假设导出的数据如上,共7行,对应Tablestore表内的3行,主键分别是(pk1_V1,pk2_V1),(pk1_V2, pk2_V2),(pk1_V3, pk2_V3):
对于主键为(pk1_V1,pk2_V1)的一行,包括写入col_a列的两个版本和col_b列的一个版本等操作。
对于主键为(pk1_V2,pk2_V2)的一行,包括删除col_a列的一个版本和删除col_b列的全部版本等操作。
对于主键为(pk1_V3,pk2_V3)的一行,包括删除整行和写入col_a列的一个版本等操作。
行模式
宽行表
您可以通过行模式导出数据,该模式将用户每次更新的记录,抽取成行的形式导出,需要设置mode属性并配置列名。
"parameter": { #parameter中配置下面三项配置(例如datasource、table等其它配置项照常配置)。 "mode": "single_version_and_update_only", # 配置导出模式。 "column":[ #按照需求添加需要导出TableStore中的列,您可以自定义设置配置个数。 { "name": "uid" #列名示例,可以是主键或属性列。 }, { "name": "name" #列名示例,可以是主键或属性列。 }, ], "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。 }
时序表
时序表在创建时会自动开启Stream,因此不需要手动开启Stream功能。
Tablestore Stream Reader支持导出时序表中的增量数据,当表为时序表时,需要配置的信息如下:
"parameter": { #parameter中配置下面四项配置(例如datasource、table等其它配置项照常配置)。 "mode": "single_version_and_update_only", # 配置导出模式。 "isTimeseriesTable":"true", # 配置导出为时序表。 "column":[ #按照需求添加需要导出TableStore中的列,您可以自定义设置配置个数。 { "name": "_m_name" #度量名称字段。 }, { "name": "_data_source" #数据源字段。 }, { "name": "_tags" #标签字段,将tags转换为string类型。 }, { "name": "tag1", #标签内部字段键名称。 "is_timeseries_tag":"true" #表明该字段为tags内部字段。 }, { "name": "time" #时间戳字段。 }, { "name": "name" #属性列名称。 }, ], "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。 }
行模式导出的数据更接近于原始的行,易于后续处理,但需要注意以下问题:
每次导出的行是从用户每次更新的记录中抽取,每一行数据与用户的写入或更新操作一一对应。如果用户存在单独更新某些列的行为,则会出现有一些记录只有被更新的部分列,其它列为空的情况。
行模式不会导出数据的版本号(即每列的时间戳),也无法进行删除操作。
数据类型转换列表
目前Tablestore Stream Reader支持所有的Tablestore类型,其针对Tablestore类型的转换列表,如下所示。
类型分类 | Tablestore Stream数据类型 |
类型分类 | Tablestore Stream数据类型 |
整数类 | INTEGER |
浮点类 | DOUBLE |
字符串类 | STRING |
布尔类 | BOOLEAN |
二进制类 | BINARY |
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
- 本页导读 (1)
- 数据同步前准备:Tablestore Stream环境准备
- 支持的同步模式与字段类型
- 列模式
- 行模式
- 数据类型转换列表
- 数据同步任务开发
- 附录:脚本Demo与参数说明
- 离线任务脚本配置方式
- Reader脚本Demo
- Reader脚本参数