本文主要为您介绍如何使用Hive或者HadoopMR访问表格存储中的表。
数据准备
在表格存储中准备一张数据表pet,name是唯一的一列主键,数据示例请参见下表。
表中空白部分无需写入,因为表格存储是schema-free的存储结构,没有值也无需写入NULL。
name | owner | species | sex | birth | death |
Fluffy | Harold | cat | f | 1993-02-04 | |
Claws | Gwen | cat | m | 1994-03-17 | |
Buffy | Harold | dog | f | 1989-05-13 | |
Fang | Benny | dog | m | 1990-08-27 | |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | |
Whistler | Gwen | bird | 1997-12-09 | ||
Slim | Benny | snake | m | 1996-04-29 | |
Puffball | Diane | hamster | f | 1999-03-30 |
前提条件
已准备好Hadoop、Hive、JDK环境以及表格存储Java SDK和EMR SDK依赖包。更多信息,请参见准备工作。
Hive访问示例
HADOOP_HOME及HADOOP_CLASSPATH可以添加到/etc/profile中,示例如下:
export HADOOP_HOME=${您的Hadoop安装目录} export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
执行
bin/hive
命令进入Hive后,创建外表。示例如下:CREATE EXTERNAL TABLE pet (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING) STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler' WITH SERDEPROPERTIES( "tablestore.columns.mapping"="name,owner,species,sex,birth,death") TBLPROPERTIES ( "tablestore.endpoint"="YourEndpoint", "tablestore.access_key_id"="YourAccessKeyId", "tablestore.access_key_secret"="YourAccessKeySecret", "tablestore.table.name"="pet");
具体配置项说明请参见下表。
配置项
说明
WITH SERDEPROPERTIES
字段映射配置,包括tablestore.columns.mapping选项配置。
在默认情况下,外表的字段名即为表格存储上表的列名(主键列名或属性列名)。但有时外表的字段名和表上列名并不一致(例如处理大小写或字符集相关的问题),此时需要指定tablestore.columns.mapping。该参数为一个英文逗号分隔的字符串,每个分隔之间不能添加空格,每一项都是表中列名,顺序与外表字段保持一致。
说明表格存储的列名支持空白字符,所以空白也会被认为是表中列名的一部分。
TBLPROPERTIES
表的属性配置。包括如下选项:
tablestore.endpoint(必选):访问表格存储的服务地址,您可以在表格存储控制台上查看实例的Endpoint信息。关于服务地址的更多信息,请参见服务地址。
tablestore.instance(可选):表格存储的实例名称。如果不填写,则为tablestore.endpoint的第一段。关于实例的更多信息,请参见实例。
tablestore.access_key_id(必选):阿里云账号或者RAM用户的AccessKey ID。更多信息,请参见获取AccessKey。
当要使用STS服务临时访问资源时,请设置此参数为临时访问凭证的AccessKey ID。
tablestore.access_key_secret(必选):阿里云账号或者RAM用户的AccessKey Secret。更多信息,请参见获取AccessKey。
当要使用STS服务临时访问资源时,请设置此参数为临时访问凭证的AccessKey Secret。
tablestore.sts_token(可选):临时访问凭证的安全令牌。当要使用STS服务临时访问资源时,才需要设置此参数。更多信息,请参见通过RAM Policy为RAM用户授权。
tablestore.table.name(必选):表格存储中对应的表名。
查询表中数据。
执行
SELECT * FROM pet;
命令查询表中所有行数据。返回结果示例如下:
Bowser Diane dog m 1979-08-31 1995-07-29 Buffy Harold dog f 1989-05-13 NULL Chirpy Gwen bird f 1998-09-11 NULL Claws Gwen cat m 1994-03-17 NULL Fang Benny dog m 1990-08-27 NULL Fluffy Harold cat f 1993-02-04 NULL Puffball Diane hamster f 1999-03-30 NULL Slim Benny snake m 1996-04-29 NULL Whistler Gwen bird NULL 1997-12-09 NULL Time taken: 5.045 seconds, Fetched 9 row(s)
执行
SELECT * FROM pet WHERE birth > "1995-01-01";
命令查询表中birth列值大于1995-01-01的行数据。返回结果示例如下:
Chirpy Gwen bird f 1998-09-11 NULL Puffball Diane hamster f 1999-03-30 NULL Slim Benny snake m 1996-04-29 NULL Whistler Gwen bird NULL 1997-12-09 NULL Time taken: 1.41 seconds, Fetched 4 row(s)
HadoopMR访问示例
以下示例介绍如何使用HadoopMR程序统计数据表pet的行数。
构建Mappers和Reducers
public class RowCounter { public static class RowCounterMapper extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> { private final static Text agg = new Text("TOTAL"); private final static LongWritable one = new LongWritable(1); @Override public void map( PrimaryKeyWritable key, RowWritable value, Context context) throws IOException, InterruptedException { context.write(agg, one); } } public static class IntSumReducer extends Reducer<Text,LongWritable,Text,LongWritable> { @Override public void reduce( Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } }
数据源每从表格存储上读出一行,都会调用一次mapper的map()。PrimaryKeyWritable和RowWritable两个参数分别对应这行的主键以及该行的内容。您可以通过调用PrimaryKeyWritable.getPrimaryKey()和RowWritable.getRow()取得表格存储Java SDK定义的主键对象及行对象。
配置表格存储作为mapper的数据源。
private static RangeRowQueryCriteria fetchCriteria() { RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName"); res.setMaxVersions(1); List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>(); List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>(); lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN)); upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX)); res.setInclusiveStartPrimaryKey(new PrimaryKey(lower)); res.setExclusiveEndPrimaryKey(new PrimaryKey(upper)); return res; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "row count"); job.addFileToClassPath(new Path("hadoop-connector.jar")); job.setJarByClass(RowCounter.class); job.setMapperClass(RowCounterMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TableStoreInputFormat.class); TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/"); TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret"); TableStoreInputFormat.addCriteria(job, fetchCriteria()); FileOutputFormat.setOutputPath(job, new Path("output")); System.exit(job.waitForCompletion(true) ? 0 : 1); }
示例中使用job.setInputFormatClass(TableStoreInputFormat.class)将表格存储设置为数据源,除此之外,还需要:
把hadoop-connector.jar部署到集群上并添加到classpath中。路径为addFileToClassPath()指定hadoop-connector.jar的本地路径。代码中假定hadoop-connector.jar在当前路径。
访问表格存储需要指定入口和身份。通过TableStoreInputFormat.setEndpoint()和TableStoreInputFormat.setCredential()设置访问表格存储需要指定的Endpoint和AccessKey信息。
指定一张表用来计数。
说明每调用一次addCriteria()可以在数据源里添加一个Java SDK定义的RangeRowQueryCriteria对象。可以多次调用addCriteria()。RangeRowQueryCriteria对象与表格存储Java SDK GetRange接口所用的RangeRowQueryCriteria对象具有相同的限制条件。
使用RangeRowQueryCriteria的setFilter()和addColumnsToGet()可以在表格存储的服务器端过滤掉不必要的行和列,减少访问数据的大小,降低成本,提高性能。
通过添加对应多张表的多个RangeRowQueryCriteria,可以实现多表的union。
通过添加同一张表的多个RangeRowQueryCriteria,可以做到更均匀的切分。TableStore-Hadoop Connector会根据一些策略将用户传入的范围切细。
程序运行示例
设置HADOOP_CLASSPATH。
HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
执行
find output -type f
命令查找output目录下的所有文件。返回结果示例如下:
output/_SUCCESS output/part-r-00000 output/._SUCCESS.crc output/.part-r-00000.crc
执行
cat output/part-r-00000
命令统计运行结果中的行数。TOTAL 9
类型转换说明
表格存储支持的数据类型与Hive或者Spark支持的数据类型不完全相同。
下表列出了从表格存储的数据类型(行)转换到Hive或Spark数据类型(列)的支持情况。
类型转换 | TINYINT | SMALLINT | INT | BIGINT | FLOAT | DOUBLE | BOOLEAN | STRING | BINARY |
INTEGER | 支持,损失精度 | 支持,损失精度 | 支持,损失精度 | 支持 | 支持,损失精度 | 支持,损失精度 | 不支持 | 不支持 | 不支持 |
DOUBLE | 支持,损失精度 | 支持,损失精度 | 支持,损失精度 | 支持,损失精度 | 支持,损失精度 | 支持 | 不支持 | 不支持 | 不支持 |
BOOLEAN | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 支持 | 不支持 | 不支持 |
STRING | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 支持 | 不支持 |
BINARY | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 支持 |