表格存储Tablestore是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,Tablestore数据源为您提供读取和写入Tablestore双向通道的功能,本文为您介绍DataWorks的Tablestore数据同步的能力支持情况。
使用限制
-
Tablestore Reader和Writer插件实现了从Tablestore读取和写入数据,包含行模式、列模式两种数据读取与写入方式,可针对宽表与时序表进行数据读取与写入。
-
列模式:在Tablestore多版本模型下,表中的数据组织为三级的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。列模式会将数据导出为(主键值,列名,时间戳,列值)的四元组格式,列模式下导入的数据也是(主键值,列名,时间戳,列值)的四元组格式。
-
行模式:该模式将用户每次更新的记录,抽取成行的形式导出,即(主键值,列值)的格式。
行模式下每一行数据对应TableStore表中的一条数据。写入行模式的数据包含主键列列值、普通列列值两部分。
-
-
Tablestore列由主键列primaryKey和普通列column组成,源端列顺序需要和Tablestore目的端主键列和普通列保持一致,否则会产生列映射错误。
-
Tablestore Reader会根据一张表中待读取的数据的范围,按照数据同步并发的数目N,将范围等分为N份Task。每个Task都会有一个Tablestore Reader线程来执行。
支持的字段类型
目前Tablestore Reader和Tablestore Writer支持所有Tablestore类型,其针对Tablestore类型的转换列表,如下所示。
|
类型分类 |
Tablestore数据类型 |
|
整数类 |
INTEGER |
|
浮点类 |
DOUBLE |
|
字符串类 |
STRING |
|
布尔类 |
BOOLEAN |
|
二进制类 |
BINARY |
-
Tablestore本身不支持日期型类型。应用层通常使用Long保存时间的Unix TimeStamp。
-
您需要将INTEGER类型的数据,在脚本模式中配置为INT类型,DataWorks会将其转换为INTEGER类型。如果您直接配置为INTEGER类型,日志将会报错,导致任务无法顺利完成。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见数据源管理,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
-
操作流程请参见脚本模式配置。
-
脚本模式配置的全量参数和脚本Demo请参见下文的附录二:Writer脚本Demo与参数说明。
附录一:Reader脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
行模式读取宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"newVersion":"true",//使用新版otsreader
"mode": "normal",// 行模式读取数据
"isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
"column":[//字段。
{
"name":"column1"//字段名。
},
{
"name":"column2"
},
{
"name":"column3"
},
{
"name":"column4"
},
{
"name":"column5"
}
],
"range":{
"split":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"STRING",
"value":"splitPoint1"
},
{
"type":"STRING",
"value":"splitPoint2"
},
{
"type":"STRING",
"value":"splitPoint3"
},
{
"type":"STRING",
"value":"endValue"
}
],
"end":[
{
"type":"STRING",
"value":"endValue"
},
{
"type":"INT",
"value":"100"
},
{
"type":"INF_MAX"
},
{
"type":"INF_MAX"
}
],
"begin":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"INT",
"value":"0"
},
{
"type":"INF_MIN"
},
{
"type":"INF_MIN"
}
]
},
"table":""//表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式读时序表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table": "",//表名
// 读时序数据mode必须为normal
"mode": "normal",
// 读时序数据newVersion必须为true
"newVersion": "true",
// 配置该表为时序表
"isTimeseriesTable":"true",
// measurementName:配置需要读取时序数据的度量名称,非必需,为空则读取全表数据
"measurementName":"measurement_1",
"column": [
{
"name": "_m_name"
},
{
"name": "tagA",
"is_timeseries_tag":"true"
},
{
"name": "double_0",
"type":"DOUBLE"
},
{
"name": "string_0",
"type":"STRING"
},
{
"name": "long_0",
"type":"INT"
},
{
"name": "binary_0",
"type":"BINARY"
},
{
"name": "bool_0",
"type":"BOOL"
},
{
"type":"STRING",
"value":"testString"
}
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式读取宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",//表名
"newVersion":"true",//新版otsreader
"mode": "multiversion",//多版本模式
"column":[//配置需要导出的列名称(必须是非主键列)
{"name":"mobile"},
{"name":"name"},
{"name":"age"},
{"name":"salary"},
{"name":"marry"}
],
"range":{//导出的范围
"begin":[
{"type":"INF_MIN"},
{"type":"INF_MAX"}
],
"end":[
{"type":"INF_MAX"},
{"type":"INF_MIN"}
],
"split":[
]
},
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本参数通用配置
|
参数 |
描述 |
是否必选 |
默认值 |
|
endpoint |
Tablestore Server的EndPoint(服务地址),详情请参见服务地址。 |
是 |
无 |
|
accessId |
Tablestore的AccessKey ID。 |
是 |
无 |
|
accessKey |
Tablestore的AccessKey Secret。 |
是 |
无 |
|
instanceName |
Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。 您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。 实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。 |
是 |
无 |
|
table |
所选取的需要抽取的表名称,这里有且只能填写一张表。在Tablestore不存在多表同步的需求。 |
是 |
无 |
|
newVersion |
定义了使用的Tablestore Reader插件的版本。
新版Tablestore Reader除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Reader。 新版本插件配置兼容了旧版本插件的配置,即旧任务增加newVersion=true配置之后,可以正常运行。 |
否 |
false |
|
mode |
定义了读取数据的模式,当前支持两种模式:
本配置仅在新版Tablestore Reader(newVersion:true)配置下生效。 旧版Tablestore Reader会忽略mode配置,并仅支持行模式读取。 |
否 |
normal |
|
isTimeseriesTable |
定义了操作的数据表是否为时序数据表:
本配置仅在newVersion:true & mode:normal配置下生效。 旧版Tablestore Reader不支持时序表,且时序表无法按列模式读取。 |
否 |
false |
Reader脚本参数附加配置
Tablestore Reader支持行模式读取宽表、行模式读取时序表、列模式读取宽表。下面为您介绍各模式的附加配置。
行模式读宽表参数
|
参数 |
描述 |
是否必选 |
默认值 |
|
column |
所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。由于Tablestore本身是NoSQL系统,在Tablestore Reader抽取数据过程中,必须指定相应的字段名称。
|
是 |
无 |
|
begin和end |
begin和end配置项用于表示抽取Tablestore表数据的范围。 begin和end描述的是Tablestore PrimaryKey的区间分布状态,对于无限大小的区间,您可以使用 说明
例如,对一张主键为
|
否 |
(INF_MIN,INF_MAX) |
|
split |
该配置项属于高级配置项,是您自己定义切分配置信息,普通情况下不建议使用。 通过配置split参数可以自己定义分片的数据范围,通常在Tablestore 数据存储发生热点,可以使用自定义的切分规则,以下任务配置为例:
任务运行时,数据会切分成6个段,并发读取。分段数量建议比任务并发数大。
|
否 |
当split参数未配置时,使用自动切分逻辑。 自动切分逻辑会找出 Partition Key最大、最小值,并进行均匀分段。 Partition Key支持整型和字符串,整型使用整除来分段,字符串使用第一个字符的unicode码来分段。 |
行模式读时序表参数
|
参数 |
描述 |
是否必选 |
默认值 |
|
column |
column是一个数组,每个元素表示一列,可配置常量列与普通列两种类型。 对于常量列,需要配置以下字段:
对于普通列,需要配置以下字段:
读出四列数据的脚本示例:
|
是 |
无 |
|
measurementName |
配置需要读取时间线的度量名称。若不配置则读取全表数据。 |
否 |
无 |
|
timeRange |
请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为微秒。格式如下:
|
否 |
全部版本 |
列模式读宽表参数
|
参数 |
描述 |
是否必选 |
默认值 |
|
column |
指定要导出的列,在列模式下只支持普通列。 格式:
说明
|
是 |
所有列 |
|
range |
读取的数据范围,读取的范围为[begin,end),左闭右开的区间,并且:
type支持的类型有如下几类:
格式:
|
否 |
全部数据 |
|
timeRange |
请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为微秒。 格式:
|
否 |
全部版本 |
|
maxVersion |
请求的最大数据版本数,取值范围是1~INT32_MAX。 |
否 |
全部版本 |
附录二:Writer脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Writer脚本Demo
行模式写入宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",//表名。
"newVersion":"true",//使用新版otswriter
"mode": "normal",// 行模式写入数据
"isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
"primaryKey" : [//Tablestore的主键信息。
{"name":"gid", "type":"INT"},
{"name":"uid", "type":"STRING"}
],
"column" : [//字段。
{"name":"col1", "type":"INT"},
{"name":"col2", "type":"DOUBLE"},
{"name":"col3", "type":"STRING"},
{"name":"col4", "type":"STRING"},
{"name":"col5", "type":"BOOL"}
],
"writeMode" : "PutRow" //写入模式。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式写时序表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table": "testTimeseriesTableName01",
"mode": "normal",
"newVersion": "true",
"isTimeseriesTable":"true",
"timeunit":"microseconds",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source",
},
{
"name": "_tags",
},
{
"name": "_time",
},
{
"name": "string_1",
"type":"string"
},
{
"name":"tag3",
"is_timeseries_tag":"true",
}
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式写入宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",
"newVersion":"true",
"mode":"multiVersion",
"primaryKey" : [
"gid",
"uid"
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},x`
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer脚本参数通用配置
|
参数 |
描述 |
是否必选 |
默认值 |
|
datasource |
数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 |
是 |
无 |
|
endPoint |
Tablestore Server的EndPoint(服务地址),详情请参见服务地址。 |
是 |
无 |
|
accessId |
Tablestore的AccessKey ID。 |
是 |
无 |
|
accessKey |
Tablestore的AccessKey Secret。 |
是 |
无 |
|
instanceName |
Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。 您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。 |
是 |
无 |
|
table |
所选取的需要抽取的表名称,此处能且只能填写一张表。在Tablestore中不存在多表同步的需求。 |
是 |
无 |
|
newVersion |
定义了使用的Tablestore Writer插件版本。
新版Tablestore Writer除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Writer。 新版插件兼容旧版本插件配置,即旧任务增加newVersion=true配置之后,可以正常运行。 |
是 |
false |
|
mode |
定义了写入数据的模式,当前支持两种模式
本配置仅在newVersion:true配置下生效 旧版Tablestore Writer会忽略mode配置,并仅支持行模式读取。 |
否 |
normal |
|
isTimeseriesTable |
定义了操作的数据表是否为时序数据表。
本配置仅在newVersion:true & mode:normal配置下生效(列模式不兼容时序表)。 |
否 |
false |
Writer脚本参数附加配置
Tablestore Writer支持行模式写入宽表、行模式写入时序表、列模式写入宽表。下面介绍各模式的附加配置。
行模式写宽表参数
|
参数 |
描述 |
是否必选 |
默认值 |
|
primaryKey |
Tablestore的主键信息,使用JSON的数组描述字段信息。Tablestore本身是NoSQL系统,在Tablestore Writer导入数据过程中,必须指定相应的字段名称。 数据同步系统本身支持类型转换的,因此对于源头数据非STRING/INT,Tablestore Writer会进行数据类型转换。配置示例如下。
说明
Tablestore的PrimaryKey仅支持STRING和INT类型,因此Tablestore Writer本身也限定填写STRING和INT两种类型。 |
是 |
无 |
|
column |
所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。 配置示例:
其中的name为写入的Tablestore列名称,type为写入的类型。Tablestore支持STRING、INT、DOUBLE、BOOL和BINARY类型。 说明
写入过程不支持常量、函数或者自定义表达式。 |
是 |
无 |
|
writeMode |
数据写入表格存储的模式,目前支持以下两种模式:
|
是 |
无 |
|
enableAutoIncrement |
是否允许向包含主键自增列的Tablestore表中写入数据。
|
否 |
false |
|
requestTotalSizeLimitation |
该配置限制写入Tablestore时单行数据的大小,配置类型为数字。 |
否 |
1MB |
|
attributeColumnSizeLimitation |
该配置限制写入Tablestore时单个属性列的大小,配置类型为数字。 |
否 |
2MB |
|
primaryKeyColumnSizeLimitation |
该配置限制写入Tablestore时单个主键列的大小,配置类型为数字。 |
否 |
1KB |
|
attributeColumnMaxCount |
该配置限制写入Tablestore时属性列的个数,配置类型为数字。 |
否 |
1,024 |
行模式写时序表参数
|
参数 |
描述 |
是否必选 |
默认值 |
|
column |
column中的每个元素对应时序数据中的一个字段,每个元素可配置以下参数。
由于时序数据的度量名称与时间戳不能为空,因此必须配置 示例:一条待写入数据如下,包含六个字段:
使用如下配置:
|
是 |
无 |
|
timeunit |
所配置的时间戳_time字段的单位,支持NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS、MINUTES。 |
否 |
MICROSECONDS |
列模式写宽表参数
|
参数 |
描述 |
是否必选 |
默认值 |
|
primaryKey |
表的主键列。 考虑到配置成本,并不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之后是columnName。record的格式为: 例如,有以下9条数据:
配置示例如下:
写入宽行结果:
|
是 |
无 |
|
columnNamePrefixFilter |
列名前缀过滤。 Hbase导入的数据,cf和qulifier共同组成了columnName,但Tablestore不支持cf,所以需要将cf过滤掉。 配置示例: 说明
|
否 |
无 |
常见问题
-
Q:向包含主键自增列的目标表写入数据,需要如何配置Tablestore Writer?
-
Tablestore Writer的配置中必须包含以下两条:
"newVersion": "true", "enableAutoIncrement": "true", -
Tablestore Writer中不需要配置主键自增列的列名。
-
Tablestore Writer中配置的primaryKey条数+column条数需要等于上游Tablestore Reader数据的列数。
-
-
Q:在时序模型的配置中,如何理解
_tag和is_timeseries_tag两个字段?示例:某条数据共有三个标签,标签为:【手机=小米,内存=8G,镜头=莱卡】。

-
数据导出示例(Tablestore Reader)
-
如果想将上述标签合并到一起作为一列导出,则配置为:
"column": [ { "name": "_tags", } ],DataWorks会将标签导出为一列数据,形式如下:
["phone=xiaomi","camera=LEICA","RAM=8G"] -
如果希望导出
phone标签和camera标签,并且每个标签单独作为一列导出,则配置为:"column": [ { "name": "phone", "is_timeseries_tag":"true", }, { "name": "camera", "is_timeseries_tag":"true", } ],DataWorks会导出两列数据,形式如下:
xiaomi, LEICA
-
-
数据导入示例(Tablestore Writer)
现在上游数据源(Reader)有两列数据:
-
一列数据为:
["phone=xiaomi","camera=LEICA","RAM=8G"]。 -
另一列数据为:6499。
现希望将这两列数据都添加到标签里面,预期的写入后标签字段格式如下所示:
则配置为:"column": [ { "name": "_tags", }, { "name": "price", "is_timeseries_tag":"true", }, ],-
第一列配置将
["phone=xiaomi","camera=LEICA","RAM=8G"]整体导入标签字段。 -
第二列配置将
price=6499单独导入标签字段。
-
-