表格存储Tablestore是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,Tablestore数据源为您提供读取和写入Tablestore双向通道的功能,本文为您介绍DataWorks的Tablestore数据同步的能力支持情况。
使用限制
Tablestore Reader和Writer插件实现了从Tablestore读取和写入数据,包含行模式、列模式两种数据读取与写入方式,可针对宽表与时序表进行数据读取与写入。
列模式:在Tablestore多版本模型下,表中的数据组织为行 > 列 > 版本三级的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。列模式会将数据导出为(主键值,列名,时间戳,列值)的四元组格式,列模式下导入的数据也是(主键值,列名,时间戳,列值)的四元组格式。
行模式:该模式将用户每次更新的记录,抽取成行的形式导出,即(主键值,列值)的格式。
行模式下每一行数据对应TableStore表中的一条数据。写入行模式的数据包含主键列列值、普通列列值两部分。
Tablestore列由主键列primaryKey+普通列column组成,源端列顺序需要和Tablestore目的端主键列+普通列保持一致,否则会产生列映射错误。
Tablestore Reader会根据一张表中待读取的数据的范围,按照数据同步并发的数目N,将范围等分为N份Task。每个Task都会有一个Tablestore Reader线程来执行。
支持的字段类型
目前Tablestore Reader和Tablestore Writer支持所有Tablestore类型,其针对Tablestore类型的转换列表,如下所示。
类型分类 | Tablestore数据类型 |
整数类 | INTEGER |
浮点类 | DOUBLE |
字符串类 | STRING |
布尔类 | BOOLEAN |
二进制类 | BINARY |
Tablestore本身不支持日期型类型。应用层通常使用Long保存时间的Unix TimeStamp。
您需要将INTEGER类型的数据,在脚本模式中配置为INT类型,DataWorks会将其转换为INTEGER类型。如果您直接配置为INTEGER类型,日志将会报错,导致任务无法顺利完成。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
操作流程请参见通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录二:Writer脚本Demo与参数说明。
附录一:Reader脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
行模式读取宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"newVersion":"true",//使用新版otsreader
"mode": "normal",// 行模式读取数据
"isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
"column":[//字段。
{
"name":"column1"//字段名。
},
{
"name":"column2"
},
{
"name":"column3"
},
{
"name":"column4"
},
{
"name":"column5"
}
],
"range":{
"split":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"STRING",
"value":"splitPoint1"
},
{
"type":"STRING",
"value":"splitPoint2"
},
{
"type":"STRING",
"value":"splitPoint3"
},
{
"type":"STRING",
"value":"endValue"
}
],
"end":[
{
"type":"STRING",
"value":"endValue"
},
{
"type":"INT",
"value":"100"
},
{
"type":"INF_MAX"
},
{
"type":"INF_MAX"
}
],
"begin":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"INT",
"value":"0"
},
{
"type":"INF_MIN"
},
{
"type":"INF_MIN"
}
]
},
"table":""//表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式读时序表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table": "",//表名
// 读时序数据mode必须为normal
"mode": "normal",
// 读时序数据newVersion必须为true
"newVersion": "true",
// 配置该表为时序表
"isTimeseriesTable":"true",
// measurementName:配置需要读取时序数据的度量名称,非必需,为空则读取全表数据
"measurementName":"measurement_1",
"column": [
{
"name": "_m_name"
},
{
"name": "tagA",
"is_timeseries_tag":"true"
},
{
"name": "double_0",
"type":"DOUBLE"
},
{
"name": "string_0",
"type":"STRING"
},
{
"name": "long_0",
"type":"INT"
},
{
"name": "binary_0",
"type":"BINARY"
},
{
"name": "bool_0",
"type":"BOOL"
},
{
"type":"STRING",
"value":"testString"
}
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式读取宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",//表名
"newVersion":"true",//新版otsreader
"mode": "multiversion",//多版本模式
"column":[//配置需要导出的列名称(必须是非主键列)
{"name":"mobile"},
{"name":"name"},
{"name":"age"},
{"name":"salary"},
{"name":"marry"}
],
"range":{//导出的范围
"begin":[
{"type":"INF_MIN"},
{"type":"INF_MAX"}
],
"end":[
{"type":"INF_MAX"},
{"type":"INF_MIN"}
],
"split":[
]
},
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本参数通用配置
参数 | 描述 | 是否必选 | 默认值 |
endpoint | Tablestore Server的EndPoint(服务地址),详情请参见服务地址。 | 是 | 无 |
accessId | Tablestore的AccessKey ID。 | 是 | 无 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 无 |
instanceName | Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。 您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。 实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。 | 是 | 无 |
table | 所选取的需要抽取的表名称,这里有且只能填写一张表。在Tablestore不存在多表同步的需求。 | 是 | 无 |
newVersion | 定义了使用的Tablestore Reader插件的版本。
新版Tablestore Reader除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Reader。 新版本插件配置兼容了旧版本插件的配置,即旧任务增加newVersion=true配置之后,可以正常运行。 | 否 | false |
mode | 定义了读取数据的模式,当前支持两种模式:
本配置仅在新版Tablestore Reader(newVersion:true)配置下生效。 旧版Tablestore Reader会忽略mode配置,并仅支持行模式读取。 | 否 | normal |
isTimeseriesTable | 定义了操作的数据表是否为时序数据表:
本配置仅在newVersion:true & mode:normal配置下生效。 旧版Tablestore Reader不支持时序表,且时序表无法按列模式读取。 | 否 | false |
Reader脚本参数附加配置
Tablestore Reader支持行模式读取宽表、行模式读取时序表、列模式读取宽表。下面为您介绍各模式的附加配置。
行模式读宽表参数
参数 | 描述 | 是否必选 | 默认值 |
column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。由于Tablestore本身是NoSQL系统,在Tablestore Reader抽取数据过程中,必须指定相应的字段名称。
| 是 | 无 |
begin和end | begin和end配置项用于表示抽取Tablestore表数据的范围。 begin和end描述的是Tablestore PrimaryKey的区间分布状态,对于无限大小的区间,您可以使用 说明
例如,对一张主键为
| 否 | (INF_MIN,INF_MAX) |
split | 该配置项属于高级配置项,是您自己定义切分配置信息,普通情况下不建议使用。 通过配置split参数可以自己定义分片的数据范围,通常在Tablestore 数据存储发生热点,可以使用自定义的切分规则,以下任务配置为例:
任务运行时,数据会切分成6个段,并发读取。分段数量建议比任务并发数大。
| 否 | 当split参数未配置时,使用自动切分逻辑。 自动切分逻辑会找出 Partition Key最大、最小值,并进行均匀分段。 Partition Key支持整型和字符串,整型使用整除来分段,字符串使用第一个字符的unicode码来分段。 |
行模式读时序表参数
参数 | 描述 | 是否必选 | 默认值 |
column | column是一个数组,每个元素表示一列,可配置常量列与普通列两种类型。 对于常量列,需要配置以下字段:
对于普通列,需要配置以下字段:
读出四列数据的脚本示例:
| 是 | 无 |
measurementName | 配置需要读取时间线的度量名称。若不配置则读取全表数据。 | 否 | 无 |
timeRange | 请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为毫秒。格式如下:
| 否 | 全部版本 |
列模式读宽表参数
参数 | 描述 | 是否必选 | 默认值 |
column | 指定要导出的列,在列模式下只支持普通列。 格式:
说明
| 是 | 所有列 |
range | 读取的数据范围,读取的范围为[begin,end),左闭右开的区间,并且:
type支持的类型有如下几类:
格式:
| 否 | 全部数据 |
timeRange | 请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为毫秒。 格式:
| 否 | 全部版本 |
maxVersion | 请求的最大数据版本数,取值范围是1~INT32_MAX。 | 否 | 全部版本 |
附录二:Writer脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Writer脚本Demo
行模式写入宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",//表名。
"newVersion":"true",//使用新版otswriter
"mode": "normal",// 行模式写入数据
"isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
"primaryKey" : [//Tablestore的主键信息。
{"name":"gid", "type":"INT"},
{"name":"uid", "type":"STRING"}
],
"column" : [//字段。
{"name":"col1", "type":"INT"},
{"name":"col2", "type":"DOUBLE"},
{"name":"col3", "type":"STRING"},
{"name":"col4", "type":"STRING"},
{"name":"col5", "type":"BOOL"}
],
"writeMode" : "PutRow" //写入模式。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式写时序表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table": "testTimeseriesTableName01",
"mode": "normal",
"newVersion": "true",
"isTimeseriesTable":"true",
"timeunit":"microseconds",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source",
},
{
"name": "_tags",
},
{
"name": "_time",
},
{
"name": "string_1",
"type":"string"
},
{
"name":"tag3",
"is_timeseries_tag":"true",
}
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式写入宽表配置
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//数据源。
"table":"",
"newVersion":"true",
"mode":"multiVersion",
"primaryKey" : [
"gid",
"uid"
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},x`
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer脚本参数通用配置
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
endPoint | Tablestore Server的EndPoint(服务地址),详情请参见服务地址。 | 是 | 无 |
accessId | Tablestore的AccessKey ID。 | 是 | 无 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 无 |
instanceName | Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。 您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。 | 是 | 无 |
table | 所选取的需要抽取的表名称,此处能且只能填写一张表。在Tablestore中不存在多表同步的需求。 | 是 | 无 |
newVersion | 定义了使用的Tablestore Writer插件版本。
新版Tablestore Writer除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Writer。 新版插件兼容旧版本插件配置,即旧任务增加newVersion=true配置之后,可以正常运行。 | 是 | false |
mode | 定义了写入数据的模式,当前支持两种模式
本配置仅在newVersion:true配置下生效 旧版Tablestore Writer会忽略mode配置,并仅支持行模式读取。 | 否 | normal |
isTimeseriesTable | 定义了操作的数据表是否为时序数据表。
本配置仅在newVersion:true & mode:normal配置下生效(列模式不兼容时序表)。 | 否 | false |
Writer脚本参数附加配置
Tablestore Writer支持行模式写入宽表、行模式写入时序表、列模式写入宽表。下面介绍各模式的附加配置。
行模式写宽表参数
参数 | 描述 | 是否必选 | 默认值 |
primaryKey | Tablestore的主键信息,使用JSON的数组描述字段信息。Tablestore本身是NoSQL系统,在Tablestore Writer导入数据过程中,必须指定相应的字段名称。 数据同步系统本身支持类型转换的,因此对于源头数据非STRING/INT,Tablestore Writer会进行数据类型转换。配置示例如下。
说明 Tablestore的PrimaryKey仅支持STRING和INT类型,因此Tablestore Writer本身也限定填写STRING和INT两种类型。 | 是 | 无 |
column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。 配置示例:
其中的name为写入的Tablestore列名称,type为写入的类型。Tablestore支持STRING、INT、DOUBLE、BOOL和BINARY类型。 说明 写入过程不支持常量、函数或者自定义表达式。 | 是 | 无 |
writeMode | 数据写入表格存储的模式,目前支持以下两种模式:
| 是 | 无 |
enableAutoIncrement | 是否允许向包含主键自增列的Tablestore表中写入数据。
| 否 | false |
requestTotalSizeLimitation | 该配置限制写入Tablestore时单行数据的大小,配置类型为数字。 | 否 | 1MB |
attributeColumnSizeLimitation | 该配置限制写入Tablestore时单个属性列的大小,配置类型为数字。 | 否 | 2MB |
primaryKeyColumnSizeLimitation | 该配置限制写入Tablestore时单个主键列的大小,配置类型为数字。 | 否 | 1KB |
attributeColumnMaxCount | 该配置限制写入Tablestore时属性列的个数,配置类型为数字。 | 否 | 1,024 |
行模式写时序表参数
参数 | 描述 | 是否必选 | 默认值 |
column | column中的每个元素对应时序数据中的一个字段,每个元素可配置以下参数。
由于时序数据的度量名称与时间戳不能为空,因此必须配置 示例:一条待写入数据如下,包含六个字段:
使用如下配置:
写入Tablestore后,控制台查看结果如下 | 是 | 无 |
timeunit | 所配置的时间戳_time字段的单位,支持NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS、MINUTES。 | 否 | MICROSECONDS |
列模式写宽表参数
参数 | 描述 | 是否必选 | 默认值 |
primaryKey | 表的主键列。 考虑到配置成本,并不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之后是columnName。record的格式为: 例如,有以下9条数据:
配置示例如下:
写入宽行结果:
| 是 | 无 |
columnNamePrefixFilter | 列名前缀过滤。 Hbase导入的数据,cf和qulifier共同组成了columnName,但Tablestore不支持cf,所以需要将cf过滤掉。 配置示例: 说明
| 否 | 无 |
常见问题
问题一
Q:向包含主键自增列的目标表写入数据,需要如何配置Tablestore Writer?
Tablestore Writer的配置中必须包含以下两条:
"newVersion": "true", "enableAutoIncrement": "true",
Tablestore Writer中不需要配置主键自增列的列名。
Tablestore Writer中配置的primaryKey条数+column条数需要等于上游Tablestore Reader数据的列数。
问题二
Q:在时序模型的配置中,如何理解_tag
和is_timeseries_tag
两个字段?
示例:某条数据共有三个标签,标签为:【手机=小米,内存=8G,镜头=莱卡】。
数据导出示例(Tablestore Reader)
如果想将上述标签合并到一起作为一列导出,则配置为:
"column": [ { "name": "_tags", } ],
DataWorks会将标签导出为一列数据,形式如下:
["phone=xiaomi","camera=LEICA","RAM=8G"]
如果希望导出
phone
标签和camera
标签,并且每个标签单独作为一列导出,则配置为:"column": [ { "name": "phone", "is_timeseries_tag":"true", }, { "name": "camera", "is_timeseries_tag":"true", } ],
DataWorks会导出两列数据,形式如下:
xiaomi, LEICA
数据导入示例(Tablestore Writer)
现在上游数据源(Reader)有两列数据:
一列数据为:
["phone=xiaomi","camera=LEICA","RAM=8G"]
。另一列数据为:6499。
现希望将这两列数据都添加到标签里面,预期的写入后标签字段格式如下所示:则配置为:
"column": [ { "name": "_tags", }, { "name": "price", "is_timeseries_tag":"true", }, ],
第一列配置将
["phone=xiaomi","camera=LEICA","RAM=8G"]
整体导入标签字段。第二列配置将
price=6499
单独导入标签字段。