全部产品
表格存储

使用教程

更新时间:2017-07-07 09:54:02   分享:   

Hive/HadoopMR 访问表格存储教程

数据准备

在表格存储中准备一张数据表 pet,name 是唯一的一列主键,数据示例如下:

name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30

注意:表格中空白的部分不需要写入,因为表格存储是一个 schema-free 的存储结构(数据模型),没有值也不需要写入NULL

Hive 访问示例

前提条件

按照准备工作准备好 Hadoop、Hive、JDK 环境以及表格存储 JAVA SDK 和 EMR SDK 依赖包。

示例

  1. # HADOOP_HOME 及 HADOOP_CLASSPATH 可以添加到 /etc/profile 中
  2. $ export HADOOP_HOME=${你的 Hadoop 安装目录}
  3. $ export HADOOP_CLASSPATH=emr-sdk_2.10-1.3.0.jar:tablestore-4.1.0-jar-with-dependencies.jar:joda-time-2.9.4.jar
  4. $ bin/hive
  5. hive> CREATE EXTERNAL TABLE pet
  6. (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
  7. STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
  8. WITH SERDEPROPERTIES(
  9. "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
  10. TBLPROPERTIES (
  11. "tablestore.endpoint"="YourEndpoint",
  12. "tablestore.access_key_id"="YourAccessKeyId",
  13. "tablestore.access_key_secret"="YourAccessKeySecret",
  14. "tablestore.table.name"="pet");
  15. hive> SELECT * FROM pet;
  16. Bowser Diane dog m 1979-08-31 1995-07-29
  17. Buffy Harold dog f 1989-05-13 NULL
  18. Chirpy Gwen bird f 1998-09-11 NULL
  19. Claws Gwen cat m 1994-03-17 NULL
  20. Fang Benny dog m 1990-08-27 NULL
  21. Fluffy Harold cat f 1993-02-04 NULL
  22. Puffball Diane hamster f 1999-03-30 NULL
  23. Slim Benny snake m 1996-04-29 NULL
  24. Whistler Gwen bird NULL 1997-12-09 NULL
  25. Time taken: 5.045 seconds, Fetched 9 row(s)
  26. hive> SELECT * FROM pet WHERE birth > "1995-01-01";
  27. Chirpy Gwen bird f 1998-09-11 NULL
  28. Puffball Diane hamster f 1999-03-30 NULL
  29. Slim Benny snake m 1996-04-29 NULL
  30. Whistler Gwen bird NULL 1997-12-09 NULL
  31. Time taken: 1.41 seconds, Fetched 4 row(s)

参数说明如下:

  • WITH SERDEPROPERTIES

    tablestore.columns.mapping(可选):在默认的情况下,外表的字段名即为表格存储上表的列名(主键列名或属性列名)。但有时外表的字段名和表上列名并不一致(比如处理大小写或字符集相关的问题),这时候就需要指定 tablestore.columns.mapping。该参数为一个英文逗号分隔的字符串,每一项都是表上列名,顺序与外表字段一致。

    注意:空白也会被认为是表上列名的一部分。

  • TBLPROPERTIES

    • tablestore.endpoint(必填):访问表格存储的服务地址,也可以在表格存储控制台上查看这个实例的 endpoint 信息。

    • tablestore.instance(可选):表格存储的实例名。若不填,则为 tablestore.endpoint 的第一段。

    • tablestore.table.name(必填):表格存储上对应的表名。

    • tablestore.access_key_id、tablestore.access_key_secret(必填),请参见访问控制

    • tablestore.sts_token(可选),请参见授权管理

HadoopMR 访问示例

以下示例介绍如何使用 HadoopMR 程序统计数据表 pet 的行数。

示例代码

  • 构建 Mappers 和 Reducers

    1. public class RowCounter {
    2. public static class RowCounterMapper
    3. extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> {
    4. private final static Text agg = new Text("TOTAL");
    5. private final static LongWritable one = new LongWritable(1);
    6. @Override
    7. public void map(
    8. PrimaryKeyWritable key, RowWritable value, Context context)
    9. throws IOException, InterruptedException {
    10. context.write(agg, one);
    11. }
    12. }
    13. public static class IntSumReducer
    14. extends Reducer<Text,LongWritable,Text,LongWritable> {
    15. @Override
    16. public void reduce(
    17. Text key, Iterable<LongWritable> values, Context context)
    18. throws IOException, InterruptedException {
    19. long sum = 0;
    20. for (LongWritable val : values) {
    21. sum += val.get();
    22. }
    23. context.write(key, new LongWritable(sum));
    24. }
    25. }
    26. }

    数据源每从表格存储上读出一行,都会调用一次 mapper 的 map()。前两个参数 PrimaryKeyWritable 和 RowWritable 分别对应这行的主键以及这行的内容。可以通过调用 PrimaryKeyWritable.getPrimaryKey() 和 RowWritable.getRow() 取得表格存储 JAVA SDK 定义的主键对象及行对象。

  • 配置表格存储作为 mapper 的数据源。

    1. private static RangeRowQueryCriteria fetchCriteria() {
    2. RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
    3. res.setMaxVersions(1);
    4. List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    5. List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    6. lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
    7. upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
    8. res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    9. res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    10. return res;
    11. }
    12. public static void main(String[] args) throws Exception {
    13. Configuration conf = new Configuration();
    14. Job job = Job.getInstance(conf, "row count");
    15. job.addFileToClassPath(new Path("hadoop-connector.jar"));
    16. job.setJarByClass(RowCounter.class);
    17. job.setMapperClass(RowCounterMapper.class);
    18. job.setCombinerClass(IntSumReducer.class);
    19. job.setReducerClass(IntSumReducer.class);
    20. job.setOutputKeyClass(Text.class);
    21. job.setOutputValueClass(LongWritable.class);
    22. job.setInputFormatClass(TableStoreInputFormat.class);
    23. TableStoreInputFormat.setEndpoint(job, "http://YourInstance.Region.ots.aliyuncs.com/");
    24. TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret");
    25. TableStoreInputFormat.addCriteria(job, fetchCriteria());
    26. FileOutputFormat.setOutputPath(job, new Path("output"));
    27. System.exit(job.waitForCompletion(true) ? 0 : 1);
    28. }

    示例代码中使用 job.setInputFormatClass(TableStoreInputFormat.class) 把表格存储设为数据源,除此之外,还需要:

    • 把 hadoop-connector.jar 部署到集群上并添加到 classpath 里面。路径为 addFileToClassPath() 指定 hadoop-connector.jar 的本地路径。代码中假定 hadoop-connector.jar 在当前路径。

    • 访问表格存储需要指定入口和身份。通过 TableStoreInputFormat.setEndpoint() 和 TableStoreInputFormat.setCredential() 设置访问表格存储需要指定的 endpoint 和 access key 信息。

    • 指定一张表用来计数。

      注意:

      • 每调用一次 addCriteria() 可以在数据源里添加一个 JAVA SDK 定义的 RangeRowQueryCriteria 对象。可以多次调用addCriteria()。RangeRowQueryCriteria 对象与表格存储 JAVA SDK GetRange 接口所用的 RangeRowQueryCriteria 对象具有相同的限制条件。
      • 可以利用 RangeRowQueryCriteria 的 setFilter() 和 addColumnsToGet() 在表格存储的服务器端过滤掉不必要的行和列,减少访问数据的大小,降低成本,提高性能。
      • 通过添加对应多张表的多个 RangeRowQueryCriteria,可以实现多表的 union。
      • 通过添加同一张表的多个 RangeRowQueryCriteria,可以做到更均匀的切分。TableStore-Hadoop Connector 会根据一些策略将用户传入的范围切细。

程序运行示例

  1. $ HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
  2. ...
  3. $ find output -type f
  4. output/_SUCCESS
  5. output/part-r-00000
  6. output/._SUCCESS.crc
  7. output/.part-r-00000.crc
  8. $ cat out/part-r-00000
  9. TOTAL 9

类型转换说明

表格存储支持的数据类型和 Hive/Spark 支持的数据类型不完全相同。

下表列出了从表格存储的数据类型(行)转换到 Hive/Spark 数据类型(列)的支持情况。

TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING BINARY
INTEGER 可,损失精度 可,损失精度  可,损失精度 可,损失精度 可,损失精度
DOUBLE 可,损失精度 可,损失精度 可,损失精度 可,损失精度 可,损失精度
BOOLEAN
STRING
BINARY
本文导读目录
本文导读目录
以上内容是否对您有帮助?