使用教程

本文主要为您介绍如何使用Hive或者HadoopMR访问表格存储中的表。

数据准备

在表格存储中准备一张数据表pet,name是唯一的一列主键,数据示例请参见下表。

说明

表中空白部分无需写入,因为表格存储是schema-free的存储结构,没有值也无需写入NULL。

name

owner

species

sex

birth

death

Fluffy

Harold

cat

f

1993-02-04

Claws

Gwen

cat

m

1994-03-17

Buffy

Harold

dog

f

1989-05-13

Fang

Benny

dog

m

1990-08-27

Bowser

Diane

dog

m

1979-08-31

1995-07-29

Chirpy

Gwen

bird

f

1998-09-11

Whistler

Gwen

bird

1997-12-09

Slim

Benny

snake

m

1996-04-29

Puffball

Diane

hamster

f

1999-03-30

前提条件

已准备好Hadoop、Hive、JDK环境以及表格存储Java SDKEMR SDK依赖包。更多信息,请参见准备工作

Hive访问示例

  1. HADOOP_HOMEHADOOP_CLASSPATH可以添加到/etc/profile中,示例如下:

    export HADOOP_HOME=${您的Hadoop安装目录}
    export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
  2. 执行bin/hive命令进入Hive后,创建外表。示例如下:

    CREATE EXTERNAL TABLE pet
      (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
      STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
      WITH SERDEPROPERTIES(
        "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
      TBLPROPERTIES (
        "tablestore.endpoint"="YourEndpoint",
        "tablestore.access_key_id"="YourAccessKeyId",
        "tablestore.access_key_secret"="YourAccessKeySecret",
        "tablestore.table.name"="pet");

    具体配置项说明请参见下表。

    配置项

    说明

    WITH SERDEPROPERTIES

    字段映射配置,包括tablestore.columns.mapping选项配置。

    在默认情况下,外表的字段名即为表格存储上表的列名(主键列名或属性列名)。但有时外表的字段名和表上列名并不一致(例如处理大小写或字符集相关的问题),此时需要指定tablestore.columns.mapping。该参数为一个英文逗号分隔的字符串,每个分隔之间不能添加空格,每一项都是表中列名,顺序与外表字段保持一致。

    说明

    表格存储的列名支持空白字符,所以空白也会被认为是表中列名的一部分。

    TBLPROPERTIES

    表的属性配置。包括如下选项:

    • tablestore.endpoint(必选):访问表格存储的服务地址,您可以在表格存储控制台上查看实例的Endpoint信息。关于服务地址的更多信息,请参见服务地址

    • tablestore.instance(可选):表格存储的实例名称。如果不填写,则为tablestore.endpoint的第一段。关于实例的更多信息,请参见实例

    • tablestore.access_key_id(必选):阿里云账号或者RAM用户的AccessKey ID。更多信息,请参见获取AccessKey

      当要使用STS服务临时访问资源时,请设置此参数为临时访问凭证的AccessKey ID。

    • tablestore.access_key_secret(必选):阿里云账号或者RAM用户的AccessKey Secret。更多信息,请参见获取AccessKey

      当要使用STS服务临时访问资源时,请设置此参数为临时访问凭证的AccessKey Secret。

    • tablestore.sts_token(可选):临时访问凭证的安全令牌。当要使用STS服务临时访问资源时,才需要设置此参数。更多信息,请参见通过RAM PolicyRAM用户授权

    • tablestore.table.name(必选):表格存储中对应的表名。

  3. 查询表中数据。

    • 执行SELECT * FROM pet;命令查询表中所有行数据。

      返回结果示例如下:

      Bowser  Diane   dog     m       1979-08-31      1995-07-29
      Buffy   Harold  dog     f       1989-05-13      NULL
      Chirpy  Gwen    bird    f       1998-09-11      NULL
      Claws   Gwen    cat     m       1994-03-17      NULL
      Fang    Benny   dog     m       1990-08-27      NULL
      Fluffy  Harold  cat     f       1993-02-04      NULL
      Puffball        Diane   hamster f       1999-03-30      NULL
      Slim    Benny   snake   m       1996-04-29      NULL
      Whistler        Gwen    bird    NULL    1997-12-09      NULL
      Time taken: 5.045 seconds, Fetched 9 row(s)
    • 执行SELECT * FROM pet WHERE birth > "1995-01-01";命令查询表中birth列值大于1995-01-01的行数据。

      返回结果示例如下:

      Chirpy  Gwen    bird    f       1998-09-11      NULL
      Puffball        Diane   hamster f       1999-03-30      NULL
      Slim    Benny   snake   m       1996-04-29      NULL
      Whistler        Gwen    bird    NULL    1997-12-09      NULL
      Time taken: 1.41 seconds, Fetched 4 row(s)

HadoopMR访问示例

以下示例介绍如何使用HadoopMR程序统计数据表pet的行数。

  • 构建MappersReducers

    public class RowCounter {
    public static class RowCounterMapper
    extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> {
        private final static Text agg = new Text("TOTAL");
        private final static LongWritable one = new LongWritable(1);
    
        @Override
        public void map(
            PrimaryKeyWritable key, RowWritable value, Context context)
            throws IOException, InterruptedException {
            context.write(agg, one);
        }
    }
    
    public static class IntSumReducer
    extends Reducer<Text,LongWritable,Text,LongWritable> {
    
        @Override
        public void reduce(
            Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }
    }
                        

    数据源每从表格存储上读出一行,都会调用一次mappermap()。PrimaryKeyWritableRowWritable两个参数分别对应这行的主键以及该行的内容。您可以通过调用PrimaryKeyWritable.getPrimaryKey()和RowWritable.getRow()取得表格存储Java SDK定义的主键对象及行对象。

  • 配置表格存储作为mapper的数据源。

    private static RangeRowQueryCriteria fetchCriteria() {
        RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
        res.setMaxVersions(1);
        List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
        List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
        lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
        upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
        res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
        res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
        return res;
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "row count");
        job.addFileToClassPath(new Path("hadoop-connector.jar"));
        job.setJarByClass(RowCounter.class);
        job.setMapperClass(RowCounterMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setInputFormatClass(TableStoreInputFormat.class);
        TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/");
        TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret");
        TableStoreInputFormat.addCriteria(job, fetchCriteria());
        FileOutputFormat.setOutputPath(job, new Path("output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }                    

    示例中使用job.setInputFormatClass(TableStoreInputFormat.class)将表格存储设置为数据源,除此之外,还需要:

    • hadoop-connector.jar部署到集群上并添加到classpath中。路径为addFileToClassPath()指定hadoop-connector.jar的本地路径。代码中假定hadoop-connector.jar在当前路径。

    • 访问表格存储需要指定入口和身份。通过TableStoreInputFormat.setEndpoint()和TableStoreInputFormat.setCredential()设置访问表格存储需要指定的EndpointAccessKey信息。

    • 指定一张表用来计数。

      说明
      • 每调用一次addCriteria()可以在数据源里添加一个Java SDK定义的RangeRowQueryCriteria对象。可以多次调用addCriteria()。RangeRowQueryCriteria对象与表格存储Java SDK GetRange接口所用的RangeRowQueryCriteria对象具有相同的限制条件。

      • 使用RangeRowQueryCriteriasetFilter()和addColumnsToGet()可以在表格存储的服务器端过滤掉不必要的行和列,减少访问数据的大小,降低成本,提高性能。

      • 通过添加对应多张表的多个RangeRowQueryCriteria,可以实现多表的union。

      • 通过添加同一张表的多个RangeRowQueryCriteria,可以做到更均匀的切分。TableStore-Hadoop Connector会根据一些策略将用户传入的范围切细。

程序运行示例

  1. 设置HADOOP_CLASSPATH。

    HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
  2. 执行find output -type f命令查找output目录下的所有文件。

    返回结果示例如下:

    output/_SUCCESS
    output/part-r-00000
    output/._SUCCESS.crc
    output/.part-r-00000.crc
  3. 执行cat output/part-r-00000命令统计运行结果中的行数。

    TOTAL   9

类型转换说明

表格存储支持的数据类型与Hive或者Spark支持的数据类型不完全相同。

下表列出了从表格存储的数据类型(行)转换到HiveSpark数据类型(列)的支持情况。

类型转换

TINYINT

SMALLINT

INT

BIGINT

FLOAT

DOUBLE

BOOLEAN

STRING

BINARY

INTEGER

支持,损失精度

支持,损失精度

支持,损失精度

支持

支持,损失精度

支持,损失精度

不支持

不支持

不支持

DOUBLE

支持,损失精度

支持,损失精度

支持,损失精度

支持,损失精度

支持,损失精度

支持

不支持

不支持

不支持

BOOLEAN

不支持

不支持

不支持

不支持

不支持

不支持

支持

不支持

不支持

STRING

不支持

不支持

不支持

不支持

不支持

不支持

不支持

支持

不支持

BINARY

不支持

不支持

不支持

不支持

不支持

不支持

不支持

不支持

支持