Tutorials

更新时间:
复制 MD 格式

This topic describes how to use Hive or HadoopMR to access tables in Tablestore.

Data preparation

Create a data table named pet in Tablestore. The name column is the only primary key column. The following table provides sample data.

Note

Do not write data to empty cells. Because Tablestore has a schema-free storage structure, you do not need to write NULL to a cell that has no value.

name

owner

species

sex

birth

death

Fluffy

Harold

cat

f

1993-02-04

Claws

Gwen

cat

m

1994-03-17

Buffy

Harold

dog

f

1989-05-13

Fang

Benny

dog

m

1990-08-27

Bowser

Diane

dog

m

1979-08-31

1995-07-29

Chirpy

Gwen

bird

f

1998-09-11

Whistler

Gwen

bird

1997-12-09

Slim

Benny

snake

m

1996-04-29

Puffball

Diane

hamster

f

1999-03-30

Prerequisites

Ensure that the Hadoop, Hive, and JDK environments are set up. You also need the Tablestore Java SDK and EMR SDK dependency packages. For more information, see Preparations.

Hive access example

  1. Add HADOOP_HOME and HADOOP_CLASSPATH to the /etc/profile file. For example:

    export HADOOP_HOME=${YourHadoopInstallationDirectory}
    export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
  2. Run the bin/hive command to start Hive. Then, create an external table. For example:

    CREATE EXTERNAL TABLE pet
      (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
      STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
      WITH SERDEPROPERTIES(
        "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
      TBLPROPERTIES (
        "tablestore.endpoint"="YourEndpoint",
        "tablestore.access_key_id"="YourAccessKeyId",
        "tablestore.access_key_secret"="YourAccessKeySecret",
        "tablestore.table.name"="pet");

    The following table describes the configuration items.

    Configuration item

    Description

    WITH SERDEPROPERTIES

    The field mapping configuration. This includes the tablestore.columns.mapping option.

    By default, the field names of the external table are the same as the column names in the Tablestore table. The column names can be primary key columns or attribute columns. If the field names are different from the column names, for example, due to case sensitivity or character set issues, you must specify tablestore.columns.mapping. This parameter is a comma-separated string of column names. Do not add spaces between the commas. The order of the column names must match the order of the fields in the external table.

    Note

    Tablestore column names support whitespace characters. Whitespace characters are considered part of the column name.

    TBLPROPERTIES

    The properties of the table. This includes the following options:

    • tablestore.endpoint (Required): The endpoint used to access Tablestore. You can view the endpoint of an instance in the Tablestore console. For more information about endpoints, see Endpoints.

    • tablestore.instance (Optional): The name of the Tablestore instance. If you do not specify this parameter, the first segment of the endpoint is used as the instance name. For more information about instances, see Instances.

    • tablestore.access_key_id (Required): The AccessKey ID of your Alibaba Cloud account or RAM user. For more information, see Obtain an AccessKey pair.

      If you use the Security Token Service (STS) to temporarily access resources, set this parameter to the AccessKey ID of the temporary access credential.

    • tablestore.access_key_secret (Required): The AccessKey secret of your Alibaba Cloud account or RAM user. For more information, see Obtain an AccessKey pair.

      If you use STS to temporarily access resources, set this parameter to the AccessKey secret of the temporary access credential.

    • tablestore.sts_token (Optional): The security token of the temporary access credential. This parameter is required only when you use STS to temporarily access resources. For more information, see Grant permissions to a RAM user using a RAM policy.

    • tablestore.table.name (Required): The name of the corresponding table in Tablestore.

  3. Query data in the table.

    • Run the SELECT * FROM pet; command to query all rows in the table.

      The command returns the following result:

      Bowser  Diane   dog     m       1979-08-31      1995-07-29
      Buffy   Harold  dog     f       1989-05-13      NULL
      Chirpy  Gwen    bird    f       1998-09-11      NULL
      Claws   Gwen    cat     m       1994-03-17      NULL
      Fang    Benny   dog     m       1990-08-27      NULL
      Fluffy  Harold  cat     f       1993-02-04      NULL
      Puffball        Diane   hamster f       1999-03-30      NULL
      Slim    Benny   snake   m       1996-04-29      NULL
      Whistler        Gwen    bird    NULL    1997-12-09      NULL
      Time taken: 5.045 seconds, Fetched 9 row(s)
    • Run the SELECT * FROM pet WHERE birth > "1995-01-01"; command to query rows where the value in the birth column is later than 1995-01-01.

      The command returns the following result:

      Chirpy  Gwen    bird    f       1998-09-11      NULL
      Puffball        Diane   hamster f       1999-03-30      NULL
      Slim    Benny   snake   m       1996-04-29      NULL
      Whistler        Gwen    bird    NULL    1997-12-09      NULL
      Time taken: 1.41 seconds, Fetched 4 row(s)

HadoopMR access example

The following example shows how to use a HadoopMR program to count the number of rows in the pet table.

  • Build mappers and reducers.

    public class RowCounter {
    public static class RowCounterMapper
    extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> {
        private final static Text agg = new Text("TOTAL");
        private final static LongWritable one = new LongWritable(1);
    
        @Override
        public void map(
            PrimaryKeyWritable key, RowWritable value, Context context)
            throws IOException, InterruptedException {
            context.write(agg, one);
        }
    }
    
    public static class IntSumReducer
    extends Reducer<Text,LongWritable,Text,LongWritable> {
    
        @Override
        public void reduce(
            Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }
    }
                        

    The map() method of the mapper is called each time a row is read from the Tablestore data source. The PrimaryKeyWritable and RowWritable parameters correspond to the primary key and content of the row, respectively. You can call PrimaryKeyWritable.getPrimaryKey() and RowWritable.getRow() to obtain the primary key object and row object defined by the Tablestore Java SDK.

  • Configure Tablestore as the data source for the mapper.

    private static RangeRowQueryCriteria fetchCriteria() {
        RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
        res.setMaxVersions(1);
        List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
        List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
        lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
        upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
        res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
        res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
        return res;
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "row count");
        job.addFileToClassPath(new Path("hadoop-connector.jar"));
        job.setJarByClass(RowCounter.class);
        job.setMapperClass(RowCounterMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setInputFormatClass(TableStoreInputFormat.class);
        TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/");
        TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret");
        TableStoreInputFormat.addCriteria(job, fetchCriteria());
        FileOutputFormat.setOutputPath(job, new Path("output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }                    

    In the example, job.setInputFormatClass(TableStoreInputFormat.class) is used to set Tablestore as the data source. You also need to:

    • Deploy the hadoop-connector.jar file to the cluster and add it to the classpath. The addFileToClassPath() method specifies the local path of the hadoop-connector.jar file. This example assumes that the hadoop-connector.jar file is in the current path.

    • Specify the endpoint and credentials to access Tablestore. Use TableStoreInputFormat.setEndpoint() and TableStoreInputFormat.setCredential() to set the required endpoint and AccessKey information.

    • Specify the table to count.

      Note
      • Each time you call addCriteria(), a RangeRowQueryCriteria object that is defined by the Java SDK is added to the data source. You can call addCriteria() multiple times. The RangeRowQueryCriteria object has the same conditions as the RangeRowQueryCriteria object used by the GetRange interface of the Tablestore Java SDK.

      • Use the setFilter() and addColumnsToGet() methods of RangeRowQueryCriteria to filter out unnecessary rows and columns on the Tablestore server. This reduces the amount of data accessed, which lowers costs and improves performance.

      • You can perform a union operation on multiple tables by adding multiple RangeRowQueryCriteria objects that correspond to the tables.

      • You can achieve more even chunking by adding multiple RangeRowQueryCriteria objects for the same table. The Tablestore-Hadoop connector chunks the range that you specify based on specific policies.

Program execution example

  1. Set HADOOP_CLASSPATH.

    HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
  2. Run the find output -type f command to find all files in the output folder.

    The command returns the following result:

    output/_SUCCESS
    output/part-r-00000
    output/._SUCCESS.crc
    output/.part-r-00000.crc
  3. Run the cat output/part-r-00000 command to view the row count.

    TOTAL   9

Type conversion notes

The data types supported by Tablestore are not exactly the same as those supported by Hive or Spark.

The following table shows how Tablestore data types (rows) are converted to Hive or Spark data types (columns).

Type conversion

TINYINT

SMALLINT

INT

BIGINT

FLOAT

DOUBLE

BOOLEAN

STRING

BINARY

INTEGER

Supported, with precision loss

Supported, with precision loss

Supported, with precision loss

Support

Supported, with precision loss

Supported, with precision loss

Not supported

Not supported

Not supported

DOUBLE

Supported, with precision loss

Supported, with precision loss

Supported, with precision loss

Supported, with precision loss

Supported, with precision loss

Support

Not supported

Not supported

Not supported

BOOLEAN

Not supported

Not supported

Not supported

Not supported

Not supported

Not supported

Supported

Not supported

Not supported

STRING

Not supported

Not supported

Not supported

Not supported

Not supported

Not supported

Not supported

Supported

Not supported

BINARY

Not supported

Not supported

Not supported

Not supported

Not supported

Not supported

Not supported

Not supported

Support