HBase数据源为您提供读取和写入HBase的双向通道,本文为您介绍DataWorks的HBase数据同步的能力支持情况。
支持的版本
支持HBase0.94.x、HBase1.1.x、HBase2.x和Phoenix5.x版本。
如果您的HBase版本为HBase0.94.x,Reader和Writer端的插件请选择094x。
"reader": { "plugin": "094x" }
"writer": { "hbaseVersion": "094x" }
如果您的HBase版本为HBase1.1.x或HBase2.x,Reader和Writer端的插件请选择11x。
"reader": { "plugin": "11x" }
"writer": { "hbaseVersion": "11x" }
说明HBase1.1.x插件当前可以兼容HBase 2.0。
HBase11xsql Writer插件实现了向Hbase中的SQL表(phoenix)批量导入数据。因为Phoenix对rowkey进行了数据编码,如果您直接使用HBaseAPI写入,需要手动转换数据,麻烦且易错。HBase11xsql Writer插件为您提供了单间的SQL表的数据导入方式。
说明通过Phoenix的JDBC驱动,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以可以同步更新索引表。
使用限制
HBase Reader | HBase20xsql Reader | HBase11xsql Writer |
|
|
|
支持的功能
HBase Reader
支持normal和multiVersionFixedColumn模式
normal模式:把HBase中的表当成普通二维表(横表)进行读取,获取最新版本数据。
hbase(main):017:0> scan 'users' ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds }
读取后的数据如下所示。
rowKey
address:city
address:contry
address:province
info:age
info:birthday
info:company
lisi
beijing
china
beijing
27
1987-06-17
baidu
xiaoming
hangzhou
china
zhejiang
29
1987-06-17
alibaba
multiVersionFixedColumn模式:把HBase中的表当成竖表进行读取。读出的每条记录是四列形式,依次为rowKey、family:qualifier、timestamp和value。读取时需要明确指定要读取的列,把每一个cell中的值,作为一条记录(record),若有多个版本则存在多条记录。
hbase(main):018:0> scan 'users',{VERSIONS=>5} ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:age, timestamp=1457082178630, value=24 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0260 seconds }
读取后的数据(4列) 如下所示。
rowKey
column:qualifier
timestamp
value
lisi
address:city
1457101972764
beijing
lisi
address:contry
1457102773908
china
lisi
address:province
1457101972736
beijing
lisi
info:age
1457101972548
27
lisi
info:birthday
1457101972604
1987-06-17
lisi
info:company
1457101972653
beijing
xiaoming
address:city
1457082196082
hangzhou
xiaoming
address:contry
1457082195729
china
xiaoming
address:province
1457082195773
zhejiang
xiaoming
info:age
1457082218735
29
xiaoming
info:age
1457082178630
24
xiaoming
info:birthday
1457082186830
1987-06-17
xiaoming
info:company
1457082189826
alibaba
HBase Writer
支持源端多个字段拼接作为rowkey
目前HBase Writer支持源端多个字段拼接作为HBase表的rowkey。
写入HBase的版本支持
写入HBase的时间戳(版本)支持:
当前时间作为版本。
指定源端列作为版本。
指定一个时间作为版本。
支持的字段类型
离线读
支持读取HBase数据类型及HBase Reader针对HBase类型的转换列表如下表所示。
类型分类 | 数据集成column配置类型 | 数据库数据类型 |
整数类 | long | short、int和long |
浮点类 | double | float和double |
字符串类 | string | binary_string和string |
日期时间类 | date | date |
字节类 | bytes | bytes |
布尔类 | boolean | boolean |
HBase20xsql Reader支持大部分Phoenix类型,但也存在个别类型没有支持的情况,请注意检查你的类型。
HBase20xsql Reader针对Phoenix类型的转换列表,如下所示。
DataX内部类型 | Phoenix数据类型 |
long | INTEGER、TINYINT、SMALLINT、BIGINT |
double | FLOAT、DECIMAL、DOUBLE |
string | CHAR、VARCHAR |
date | DATE、TIME、TIMESTAMP |
bytes | BINARY、VARBINARY |
boolean | BOOLEAN |
离线写
支持读取HBase数据类型,HBase Writer针对HBase类型的转换列表,如下表所示。
column的配置需要和HBase表对应的列类型保持一致。
除下表中罗列的字段类型外,其它类型均不支持。
类型分类 | 数据库数据类型 |
整数类 | INT、LONG和SHORT |
浮点类 | FLOAT和DOUBLE |
布尔类 | BOOLEAN |
字符串类 | STRING |
注意事项
如果您在测试连通性时遇到 "tried to access method com.google.common.base.Stopwatch" 的错误信息,请在新增数据源页面的JSON配置项内添加 "hbaseVersion": "" 字段,指定您的hbase版本,例如, "hbaseVersion:"2.0.14"。
数据同步任务开发
HBase数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
常见问题
Q:并发设置多少比较合适?速度慢时增加并发有用吗?
A:数据导入进程默认JVM的堆大小是2GB,并发(channel数)是通过多线程实现的,开过多的线程有时并不能提高导入速度,反而可能因为过于频繁的GC导致性能下降。一般建议并发数(channel)为5-10。
Q:batchSize设置多少比较合适?
A:默认是256,但应根据每行的大小来计算最合适的batchSize。通常一次操作的数据量在2MB~4MB左右,用该值除以行大小,即可得到batchSize。
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
HBase Reader脚本Demo
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"hbase",//插件名。
"parameter":{
"mode":"normal",//读取HBase的模式,支持normal模式、multiVersionFixedColumn模式。
"scanCacheSize":"256",//HBase client每次RPC从服务器端读取的行数。
"scanBatchSize":"100",//HBase client每次RPC从服务器端读取的列数。
"hbaseVersion":"094x/11x",//HBase版本。
"column":[//字段。
{
"name":"rowkey",//字段名。
"type":"string"//数据类型。
},
{
"name":"columnFamilyName1:columnName1",
"type":"string"
},
{
"name":"columnFamilyName2:columnName2",
"format":"yyyy-MM-dd",
"type":"date"
},
{
"name":"columnFamilyName3:columnName3",
"type":"long"
}
],
"range":{//指定HBase Reader读取的rowkey范围。
"endRowkey":"",//指定结束rowkey。
"isBinaryRowkey":true,//指定配置的startRowkey和endRowkey转换为byte[]时的方式,默认值为false。
"startRowkey":""//指定开始rowkey。
},
"maxVersion":"",//指定在多版本模式下的HBase Reader读取的版本数。
"encoding":"UTF-8",//编码格式。
"table":"",//表名。
"hbaseConfig":{//连接HBase集群需要的配置信息,JSON格式。
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs://ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1,//作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
HBase Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
haveKerberos | haveKerberos值为true时,表示HBase集群需要kerberos认证。 说明
| 否 | false |
hbaseConfig | 连接HBase集群需要的配置信息,JSON格式。必填的配置为hbase.zookeeper.quorum,表示HBase的ZK链接地址。同时可以补充更多HBase client的配置,例如设置scan的cache、batch来优化与服务器的交互。 说明 如果是云HBase的数据库,需要使用内网地址连接访问。 | 是 | 无 |
mode | 读取HBase的模式,支持normal模式和multiVersionFixedColumn模式。 | 是 | 无 |
table | 读取的HBase表名(大小写敏感) 。 | 是 | 无 |
encoding | 编码方式,UTF-8或GBK,用于对二进制存储的HBase byte[]转为String时的编码。 | 否 | utf-8 |
column | 要读取的HBase字段,normal模式与multiVersionFixedColumn模式下必填。
| 是 | 无 |
maxVersion | 指定在多版本模式下的HBase Reader读取的版本数,取值只能为-1或大于1的数字,-1表示读取所有版本。 | multiVersionFixedColumn模式下必填项 | 无 |
range | 指定HBase Reader读取的rowkey范围。
| 否 | 无 |
scanCacheSize | HBase Reader每次从HBase中读取的行数。 | 否 | 256 |
scanBatchSize | HBase Reader每次从HBase中读取的列数。 | 否 | 100 |
HBase Writer脚本Demo
{
"type":"job",
"version":"2.0",//版本号
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"hbase",//插件名。
"parameter":{
"mode":"normal",//写入HBase的模式。
"walFlag":"false",//关闭(false)放弃写WAL日志。
"hbaseVersion":"094x",//Hbase版本。
"rowkeyColumn":[//要写入的HBase的rowkey列。
{
"index":"0",//序列号。
"type":"string"//数据类型。
},
{
"index":"-1",
"type":"string",
"value":"_"
}
],
"nullMode":"skip",//读取的为null值时,如何处理。
"column":[//要写入的HBase字段。
{
"name":"columnFamilyName1:columnName1",//字段名。
"index":"0",//索引号。
"type":"string"//数据类型。
},
{
"name":"columnFamilyName2:columnName2",
"index":"1",
"type":"string"
},
{
"name":"columnFamilyName3:columnName3",
"index":"2",
"type":"string"
}
],
"encoding":"utf-8",//编码格式。
"table":"",//表名。
"hbaseConfig":{//连接HBase集群需要的配置信息,JSON格式。
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs: //ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
HBase Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
haveKerberos | haveKerberos值为true时,表示HBase集群需要kerberos认证。 说明
| 否 | false |
hbaseConfig | 连接HBase集群需要的配置信息,JSON格式。必填的配置为hbase.zookeeper.quorum,表示HBase的ZK链接地址。同时可以补充更多HBase client的配置,例如设置scan的cache、batch来优化与服务器的交互。 说明 如果是云HBase的数据库,需要使用内网地址连接访问。 | 是 | 无 |
mode | 写入HBase的模式,目前仅支持normal模式,后续考虑动态列模式。 | 是 | 无 |
table | 要写入的HBase表名(大小写敏感) 。 | 是 | 无 |
encoding | 编码方式,UTF-8或GBK,用于STRING转HBase byte[]时的编码。 | 否 | utf-8 |
column | 要写入的HBase字段:
| 是 | 无 |
rowkeyColumn | 要写入的HBase的rowkey列:
配置格式如下所示。
| 是 | 无 |
versionColumn | 指定写入HBase的时间戳。支持当前时间、指定时间列或指定时间(三者选一),如果不配置则表示用当前时间。
配置格式如下所示。
| 否 | 无 |
nullMode | 读取的数据为null值时,您可以通过以下两种方式解决:
| 否 | skip |
walFlag | HBase Client向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,才会接着写MemStore,最后客户端被通知提交数据成功。 如果写WAL日志失败,客户端则被通知提交失败。关闭(false)放弃写WAL日志,从而提高数据写入的性能。 | 否 | false |
writeBufferSize | 设置HBase Client的写Buffer大小,单位字节,配合autoflush使用。 autoflush(默认处于关闭状态):
| 否 | 8M |
HBase20xsql Reader脚本Demo
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"hbase20xsql",//插件名。
"parameter":{
"queryServerAddress": "http://127.0.0.1:8765", //填写连接Phoenix QueryServer地址。
"serialization": "PROTOBUF", //QueryServer序列化格式。
"table": "TEST", //读取表名。
"column": ["ID", "NAME"], //所要读取列名。
"splitKey": "ID" //切分列,必须是表主键。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1,//作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
HBase20xsql Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
queryServerAddress | HBase20xsql Reader插件需要通过Phoenix轻客户端去连接Phoenix QueryServer,因此,您需要在此处填写对应的QueryServer地址。如果HBase增强版(Lindorm)用户需要透传user、password参数,可以在queryServerAddress后增加对应的可选属性。格式为: | 是 | 无 |
serialization | QueryServer使用的序列化协议。 | 否 | PROTOBUF |
table | 所要读取的表名(大小写敏感)。 | 是 | 无 |
schema | 表所在的schema。 | 否 | 无 |
column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。默认值为空值。 | 否 | 全部列 |
splitKey | 读取表时对表进行切分,如果指定splitKey,表示您希望使用splitKey代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高了数据同步的效能。您可以选择两种不同的切分方式,如果splitPoint为空,默认根据方法一自动切分:
| 是 | 无 |
splitPoints | 根据切分列的最大值和最小值切分时不能保证避免数据热点,因此,建议切分点根据Region的startkey和endkey进行设置,保证每个查询对应单个Region。 | 否 | 无 |
where | 筛选条件,支持对表查询增加过滤条件。HBase20xsql Reader根据指定的column、table、where条件拼接SQL,并根据该SQL进行数据抽取。 | 否 | 无 |
querySql | 在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置型来自定义筛选SQL。配置该项后,除queryserverAddress参数必须设置外,HBase20xsql Reader会直接忽略column、table、where和splitKey条件的配置,使用该项配置的内容对数据进行筛选。 | 否 | 无 |
HBase11xsql Writer脚本Demo
{
"type": "job",
"version": "1.0",
"configuration": {
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"1"//限流,此处1mbps = 1MB/s。
}
},
"reader": {
"plugin": "odps",
"parameter": {
"datasource": "",
"table": "",
"column": [],
"partition": ""
}
},
"plugin": "hbase11xsql",
"parameter": {
"table": "目标hbase表名,大小写有关",
"hbaseConfig": {
"hbase.zookeeper.quorum": "目标hbase集群的ZK服务器地址,向PE咨询",
"zookeeper.znode.parent": "目标hbase集群的znode,向PE咨询"
},
"column": [
"columnName"
],
"batchSize": 256,
"nullMode": "skip"
}
}
}
HBase11xsql Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
plugin | 插件名字,必须是hbase11xsql。 | 是 | 无 |
table | 要导入的表名,大小写敏感,通常phoenix表都是大写表名。 | 是 | 无 |
column | 列名,大小写敏感。通常phoenix的列名都是大写。 说明
| 是 | 无 |
hbaseConfig | hbase集群地址,zk为必填项,格式为ip1, ip2, ip3。 说明
| 是 | 无 |
batchSize | 批量写入的最大行数。 | 否 | 256 |
nullMode | 读取到的列值为null时,您可以通过以下两种方式进行处理:
| 否 | skip |