数据准备

在表格存储中准备一张数据表 pet,其中name是唯一的一列主键。数据示例如下:

name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30
说明 表格中空白的部分不需要写入,因为表格存储是一个 schema-free 的存储结构(数据模型),没有值也不需要写入 NULL

Spark SQL 访问示例

前提条件

按照准备工作中的步骤准备好 Spark、JDK 环境以及表格存储 Java SDK 和 EMR SDK 的依赖包。

示例

$ bin/spark-sql --master local --jars tablestore-4.3.1-jar-with-dependencies.jar,emr-tablestore-1.4.2.jar
spark-sql> CREATE EXTERNAL TABLE pet
  (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
  STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
  WITH SERDEPROPERTIES(
    "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
  TBLPROPERTIES (
    "tablestore.endpoint"="YourEndpoint",
    "tablestore.access_key_id"="YourAccessKeyId",
    "tablestore.access_key_secret"="YourAccessKeySecret",
    "tablestore.table.name"="pet");
spark-sql> SELECT * FROM pet;
Bowser  Diane   dog     m       1979-08-31      1995-07-29
Buffy   Harold  dog     f       1989-05-13      NULL
Chirpy  Gwen    bird    f       1998-09-11      NULL
Claws   Gwen    cat     m       1994-03-17      NULL
Fang    Benny   dog     m       1990-08-27      NULL
Fluffy  Harold  cat     f       1993-02-04      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
spark-sql> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy  Gwen    bird    f       1998-09-11      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 1.41 seconds, Fetched 4 row(s)

参数说明如下:

  • WITH SERDEPROPERTIES
    • tablestore.columns.mapping(可选):在默认情况下,外表的字段名即为表格存储上表的列名(主键列名或属性列名)。但有时外表的字段名和表上列名并不一致(比如处理大小写或字符集相关的问题),这时候就需要指定 tablestore.columns.mapping。该参数为一个英文逗号分隔的字符串,每个分隔之间不能添加空格,每一项都是表上列名,顺序与外表字段一致。
      说明 表格存储的列名支持空白字符,所以空白也会被认为是表上列名的一部分。
  • TBLPROPERTIES
    • tablestore.endpoint(必填):访问表格存储的服务地址,也可以在表格存储控制台上查看这个实例的 endpoint 信息。

    • tablestore.instance(可选):表格存储的实例名。若不填,则为 tablestore.endpoint 的第一段。

    • tablestore.table.name(必填):表格存储上对应的表名。

    • tablestore.access_key_id、tablestore.access_key_secret(必填) ,请参见访问控制

    • tablestore.sts_token(可选),请参见授权管理

Spark 访问示例

以下示例介绍如何使用 Spark 程序统计数据表 pet 的行数。

private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    Configuration hadoopConf = new Configuration();
    TableStoreInputFormat.setCredential(
        hadoopConf,
        new Credential("YourAccessKeyId", "YourAccessKeySecret"));
    TableStoreInputFormat.setEndpoint(
        hadoopConf,
        new Endpoint("https://YourInstance.Region.ots.aliyuncs.com/"));
    TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());

    try {
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
            hadoopConf,
            TableStoreInputFormat.class,
            PrimaryKeyWritable.class,
            RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        sc.close();
    }
}
说明 如果使用 scala,只需把 JavaSparkContext 换成 SparkContext,JavaPairRDD 换成 PairRDD 即可。或者更简单,交给编译器自行做类型推断

运行程序

$ bin/spark-submit --master local --jars hadoop-connector.jar row-counter.jar
TOTAL: 9

类型转换说明

表格存储支持的数据类型和 Hive/Spark 支持的数据类型不完全相同。

下表列出了从表格存储的数据类型(行)转换到 Hive/Spark 数据类型(列)时所支持的情况。

TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING BINARY
INTEGER 可,损失精度 可,损失精度  可,损失精度 可,损失精度 可,损失精度
DOUBLE 可,损失精度 可,损失精度 可,损失精度 可,损失精度 可,损失精度
BOOLEAN
STRING
BINARY