表格存储Tablestore是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,Tablestore数据源为您提供读取和写入Tablestore双向通道的功能,本文为您介绍DataWorks的Tablestore数据同步的能力支持情况。
使用限制
Tablestore Reader和Writer插件实现了从Tablestore读取和写入数据,包含行模式、列模式两种数据读取与写入方式,可针对宽表与时序表进行数据读取与写入。
列模式:在Tablestore多版本模型下,表中的数据组织为 三级的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。列模式会将数据导出为(主键值,列名,时间戳,列值)的四元组格式,列模式下导入的数据也是(主键值,列名,时间戳,列值)的四元组格式。
行模式:该模式将用户每次更新的记录,抽取成行的形式导出,即(主键值,列值)的格式。
行模式下每一行数据对应TableStore表中的一条数据。写入行模式的数据包含主键列列值、普通列列值两部分。
Tablestore列由主键列primaryKey+普通列column组成,源端列顺序需要和Tablestore目的端主键列+普通列保持一致,否则会产生列映射错误。
Tablestore Reader会根据一张表中待读取的数据的范围,按照数据同步并发的数目N,将范围等分为N份Task。每个Task都会有一个Tablestore Reader线程来执行。
支持的字段类型
目前Tablestore Reader和Tablestore Writer支持所有Tablestore类型,其针对Tablestore类型的转换列表,如下所示。
类型分类 | Tablestore数据类型 |
整数类 | INTEGER |
浮点类 | DOUBLE |
字符串类 | STRING |
布尔类 | BOOLEAN |
二进制类 | BINARY |
Tablestore本身不支持日期型类型。应用层通常使用Long保存时间的Unix TimeStamp。
您需要将INTEGER类型的数据,在脚本模式中配置为INT类型,DataWorks会将其转换为INTEGER类型。如果您直接配置为INTEGER类型,日志将会报错,导致任务无法顺利完成。
数据同步任务开发
Tablestore数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源。
单表离线同步任务配置指导
操作流程请参见通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:Writer脚本Demo与参数说明。
附录:Reader脚本Demo与参数说明
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
Reader脚本Demo
行模式读取宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"newVersion":"true",//使用新版otsreader
"mode": "normal",// 行模式读取数据
"isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
"column":[//字段。
{
"name":"column1"//字段名。
},
{
"name":"column2"
},
{
"name":"column3"
},
{
"name":"column4"
},
{
"name":"column5"
}
],
"range":{
"split":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"STRING",
"value":"splitPoint1"
},
{
"type":"STRING",
"value":"splitPoint2"
},
{
"type":"STRING",
"value":"splitPoint3"
},
{
"type":"STRING",
"value":"endValue"
}
],
"end":[
{
"type":"STRING",
"value":"endValue"
},
{
"type":"INT",
"value":"100"
},
{
"type":"INF_MAX"
},
{
"type":"INF_MAX"
}
],
"begin":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"INT",
"value":"0"
},
{
"type":"INF_MIN"
},
{
"type":"INF_MIN"
}
]
},
"table":""//表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式读时序表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table": "",//表名
// 读时序数据mode必须为normal
"mode": "normal",
// 读时序数据newVersion必须为true
"newVersion": "true",
// 配置该表为时序表
"isTimeseriesTable":"true",
// measurementName:配置需要读取时序数据的度量名称,非必需,为空则读取全表数据
"measurementName":"measurement_1",
"column": [
{
"name": "_m_name"
},
{
"name": "tagA",
"is_timeseries_tag":"true"
},
{
"name": "double_0",
"type":"DOUBLE"
},
{
"name": "string_0",
"type":"STRING"
},
{
"name": "long_0",
"type":"INT"
},
{
"name": "binary_0",
"type":"BINARY"
},
{
"name": "bool_0",
"type":"BOOL"
},
{
"type":"STRING",
"value":"testString"
}
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式读取宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",//表名
"newVersion":"true",//新版otsreader
"mode": "multiversion",//多版本模式
"column":[//配置需要导出的列名称(必须是非主键列)
{"name":"mobile"},
{"name":"name"},
{"name":"age"},
{"name":"salary"},
{"name":"marry"}
],
"range":{//导出的范围
"begin":[
{"type":"INF_MIN"},
{"type":"INF_MAX"}
],
"end":[
{"type":"INF_MAX"},
{"type":"INF_MIN"}
],
"split":[
]
},
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本参数通用配置
参数 | 描述 | 是否必选 | 默认值 |
endpoint | Tablestore Server的EndPoint(服务地址),详情请参见服务地址。 | 是 | 无 |
accessId | Tablestore的AccessKey ID。 | 是 | 无 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 无 |
instanceName | Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。 您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。 实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。 | 是 | 无 |
table | 所选取的需要抽取的表名称,这里有且只能填写一张表。在Tablestore不存在多表同步的需求。 | 是 | 无 |
newVersion | 定义了使用的Tablestore Reader插件的版本。
新版Tablestore Reader除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Reader。 新版本插件配置兼容了旧版本插件的配置,即旧任务增加newVersion=true配置之后,可以正常运行。 | 否 | false |
mode | 定义了读取数据的模式,当前支持两种模式:
本配置仅在新版Tablestore Reader(newVersion:true)配置下生效。 旧版Tablestore Reader会忽略mode配置,并仅支持行模式读取。 | 否 | normal |
isTimeseriesTable | 定义了操作的数据表是否为时序数据表:
本配置仅在newVersion:true & mode:normal配置下生效。 旧版Tablestore Reader不支持时序表,且时序表无法按列模式读取。 | 否 | false |
Reader脚本参数附加配置
Tablestore Reader支持行模式读取宽表、行模式读取时序表、列模式读取宽表。下面为您介绍各模式的附加配置。
行模式读宽表参数
参数 | 描述 | 是否必选 | 默认值 |
column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。由于Tablestore本身是NoSQL系统,在Tablestore Reader抽取数据过程中,必须指定相应的字段名称。
| 是 | 无 |
begin和end | begin和end配置项用于表示抽取Tablestore表数据的范围。 begin和end描述的是Tablestore PrimaryKey的区间分布状态,对于无限大小的区间,您可以使用 说明
例如,对一张主键为
| 否 | (INF_MIN,INF_MAX) |
split | 该配置项属于高级配置项,是您自己定义切分配置信息,普通情况下不建议使用。 通过配置split参数可以自己定义分片的数据范围,通常在Tablestore 数据存储发生热点,可以使用自定义的切分规则,以下任务配置为例:
任务运行时,数据会切分成6个段,并发读取。分段数量建议比任务并发数大。
| 否 | 当split参数未配置时,使用自动切分逻辑。 自动切分逻辑会找出 Partition Key最大、最小值,并进行均匀分段。 Partition Key支持整型和字符串,整型使用整除来分段,字符串使用第一个字符的unicode码来分段。 |
行模式读时序表参数
参数 | 描述 | 是否必选 | 默认值 |
column | column是一个数组,每个元素表示一列,可配置常量列与普通列两种类型。 对于常量列,需要配置以下字段:
对于普通列,需要配置以下字段:
读出四列数据的脚本示例:
| 是 | 无 |
measurementName | 配置需要读取时间线的度量名称。若不配置则读取全表数据。 | 否 | 无 |
timeRange | 请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为毫秒。格式如下:
| 否 | 全部版本 |
列模式读宽表参数
参数 | 描述 | 是否必选 | 默认值 |
column | 指定要导出的列,在列模式下只支持普通列。 格式:
说明
| 是 | 所有列 |
range | 读取的数据范围,读取的范围为[begin,end),左闭右开的区间,并且:
type支持的类型有如下几类:
格式:
| 否 | 全部数据 |
timeRange | 请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为毫秒。 格式:
| 否 | 全部版本 |
maxVersion | 请求的最大数据版本数,取值范围是1~INT32_MAX。 | 否 | 全部版本 |
附录:Writer脚本Demo与参数说明
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
Writer脚本Demo
行模式写入宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",//表名。
"newVersion":"true",//使用新版otswriter
"mode": "normal",// 行模式写入数据
"isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
"primaryKey" : [//Tablestore的主键信息。
{"name":"gid", "type":"INT"},
{"name":"uid", "type":"STRING"}
],
"column" : [//字段。
{"name":"col1", "type":"INT"},
{"name":"col2", "type":"DOUBLE"},
{"name":"col3", "type":"STRING"},
{"name":"col4", "type":"STRING"},
{"name":"col5", "type":"BOOL"}
],
"writeMode" : "PutRow" //写入模式。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式写时序表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table": "testTimeseriesTableName01",
"mode": "normal",
"newVersion": "true",
"isTimeseriesTable":"true",
"timeunit":"microseconds",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source",
},
{
"name": "_tags",
},
{
"name": "_time",
},
{
"name": "string_1",
"type":"string"
},
{
"name":"tag3",
"is_timeseries_tag":"true",
}
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式写入宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",
"newVersion":"true",
"mode":"multiVersion",
"primaryKey" : [
"gid",
"uid"
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},x`
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer脚本参数通用配置
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
endPoint | Tablestore Server的EndPoint(服务地址),详情请参见服务地址。 | 是 | 无 |
accessId | Tablestore的AccessKey ID。 | 是 | 无 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 无 |
instanceName | Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。 您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。 | 是 | 无 |
table | 所选取的需要抽取的表名称,此处能且只能填写一张表。在Tablestore中不存在多表同步的需求。 | 是 | 无 |
newVersion | 定义了使用的Tablestore Writer插件版本。
新版Tablestore Writer除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Writer。 新版插件兼容旧版本插件配置,即旧任务增加newVersion=true配置之后,可以正常运行。 | 是 | false |
mode | 定义了写入数据的模式,当前支持两种模式
本配置仅在newVersion:true配置下生效 旧版Tablestore Writer会忽略mode配置,并仅支持行模式读取。 | 否 | normal |
isTimeseriesTable | 定义了操作的数据表是否为时序数据表。
本配置仅在newVersion:true & mode:normal配置下生效(列模式不兼容时序表)。 | 否 | false |
Writer脚本参数附加配置
Tablestore Writer支持行模式写入宽表、行模式写入时序表、列模式写入宽表。下面介绍各模式的附加配置。
行模式写宽表参数
参数 | 描述 | 是否必选 | 默认值 |
primaryKey | Tablestore的主键信息,使用JSON的数组描述字段信息。Tablestore本身是NoSQL系统,在Tablestore Writer导入数据过程中,必须指定相应的字段名称。 数据同步系统本身支持类型转换的,因此对于源头数据非STRING/INT,Tablestore Writer会进行数据类型转换。配置示例如下。
说明 Tablestore的PrimaryKey仅支持STRING和INT类型,因此Tablestore Writer本身也限定填写STRING和INT两种类型。 | 是 | 无 |
column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。 配置示例:
其中的name为写入的Tablestore列名称,type为写入的类型。Tablestore支持STRING、INT、DOUBLE、BOOL和BINARY类型。 说明 写入过程不支持常量、函数或者自定义表达式。 | 是 | 无 |
writeMode | 数据写入表格存储的模式,目前支持以下两种模式:
| 是 | 无 |
enableAutoIncrement | 是否允许向包含主键自增列的Tablestore表中写入数据。
| 否 | false |
requestTotalSizeLimitation | 该配置限制写入Tablestore时单行数据的大小,配置类型为数字。 | 否 | 1MB |
attributeColumnSizeLimitation | 该配置限制写入Tablestore时单个属性列的大小,配置类型为数字。 | 否 | 2MB |
primaryKeyColumnSizeLimitation | 该配置限制写入Tablestore时单个主键列的大小,配置类型为数字。 | 否 | 1KB |
attributeColumnMaxCount | 该配置限制写入Tablestore时属性列的个数,配置类型为数字。 | 否 | 1,024 |
行模式写时序表参数
参数 | 描述 | 是否必选 | 默认值 |
column | column中的每个元素对应时序数据中的一个字段,每个元素可配置以下参数。
由于时序数据的度量名称与时间戳不能为空,因此必须配置 示例:一条待写入数据如下,包含六个字段:
使用如下配置:
写入Tablestore后,控制台查看结果如下 | 是 | 无 |
timeunit | 所配置的时间戳_time字段的单位,支持NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS、MINUTES。 | 否 | MICROSECONDS |
列模式写宽表参数
参数 | 描述 | 是否必选 | 默认值 |
primaryKey | 表的主键列。 考虑到配置成本,并不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之后是columnName。record的格式为: 例如,有以下9条数据:
配置示例如下:
写入宽行结果:
| 是 | 无 |
columnNamePrefixFilter | 列名前缀过滤。 Hbase导入的数据,cf和qulifier共同组成了columnName,但Tablestore不支持cf,所以需要将cf过滤掉。 配置示例: 说明
| 否 | 无 |
常见问题
问题一
Q:向包含主键自增列的目标表写入数据,需要如何配置Tablestore Writer?
Tablestore Writer的配置中必须包含以下两条:
"newVersion": "true", "enableAutoIncrement": "true",
Tablestore Writer中不需要配置主键自增列的列名。
Tablestore Writer中配置的primaryKey条数+column条数需要等于上游Tablestore Reader数据的列数。
问题二
Q:在时序模型的配置中,如何理解_tag
和is_timeseries_tag
两个字段?
示例:某条数据共有三个标签,标签为:【手机=小米,内存=8G,镜头=莱卡】。
数据导出示例(Tablestore Reader)
如果想将上述标签合并到一起作为一列导出,则配置为:
"column": [ { "name": "_tags", } ],
DataWorks会将标签导出为一列数据,形式如下:
["phone=xiaomi","camera=LEICA","RAM=8G"]
如果希望导出
phone
标签和camera
标签,并且每个标签单独作为一列导出,则配置为:"column": [ { "name": "phone", "is_timeseries_tag":"true", }, { "name": "camera", "is_timeseries_tag":"true", } ],
DataWorks会导出两列数据,形式如下:
xiaomi, LEICA
数据导入示例(Tablestore Writer)
现在上游数据源(Reader)有两列数据:
一列数据为:
["phone=xiaomi","camera=LEICA","RAM=8G"]
。另一列数据为:6499。
现希望将这两列数据都添加到标签里面,预期的写入后标签字段格式如下所示:则配置为:
"column": [ { "name": "_tags", }, { "name": "price", "is_timeseries_tag":"true", }, ],
第一列配置将
["phone=xiaomi","camera=LEICA","RAM=8G"]
整体导入标签字段。第二列配置将
price=6499
单独导入标签字段。