This topic describes how to use Hive or HadoopMR to access tables in Tablestore.
Data preparation
Create a data table named pet in Tablestore. The name column is the only primary key column. The following table provides sample data.
Do not write data to empty cells. Because Tablestore has a schema-free storage structure, you do not need to write NULL to a cell that has no value.
name | owner | species | sex | birth | death |
Fluffy | Harold | cat | f | 1993-02-04 | |
Claws | Gwen | cat | m | 1994-03-17 | |
Buffy | Harold | dog | f | 1989-05-13 | |
Fang | Benny | dog | m | 1990-08-27 | |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | |
Whistler | Gwen | bird | 1997-12-09 | ||
Slim | Benny | snake | m | 1996-04-29 | |
Puffball | Diane | hamster | f | 1999-03-30 |
Prerequisites
Ensure that the Hadoop, Hive, and JDK environments are set up. You also need the Tablestore Java SDK and EMR SDK dependency packages. For more information, see Preparations.
Hive access example
Add HADOOP_HOME and HADOOP_CLASSPATH to the /etc/profile file. For example:
export HADOOP_HOME=${YourHadoopInstallationDirectory} export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jarRun the
bin/hivecommand to start Hive. Then, create an external table. For example:CREATE EXTERNAL TABLE pet (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING) STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler' WITH SERDEPROPERTIES( "tablestore.columns.mapping"="name,owner,species,sex,birth,death") TBLPROPERTIES ( "tablestore.endpoint"="YourEndpoint", "tablestore.access_key_id"="YourAccessKeyId", "tablestore.access_key_secret"="YourAccessKeySecret", "tablestore.table.name"="pet");The following table describes the configuration items.
Configuration item
Description
WITH SERDEPROPERTIES
The field mapping configuration. This includes the tablestore.columns.mapping option.
By default, the field names of the external table are the same as the column names in the Tablestore table. The column names can be primary key columns or attribute columns. If the field names are different from the column names, for example, due to case sensitivity or character set issues, you must specify tablestore.columns.mapping. This parameter is a comma-separated string of column names. Do not add spaces between the commas. The order of the column names must match the order of the fields in the external table.
NoteTablestore column names support whitespace characters. Whitespace characters are considered part of the column name.
TBLPROPERTIES
The properties of the table. This includes the following options:
tablestore.endpoint (Required): The endpoint used to access Tablestore. You can view the endpoint of an instance in the Tablestore console. For more information about endpoints, see Endpoints.
tablestore.instance (Optional): The name of the Tablestore instance. If you do not specify this parameter, the first segment of the endpoint is used as the instance name. For more information about instances, see Instances.
tablestore.access_key_id (Required): The AccessKey ID of your Alibaba Cloud account or RAM user. For more information, see Obtain an AccessKey pair.
If you use the Security Token Service (STS) to temporarily access resources, set this parameter to the AccessKey ID of the temporary access credential.
tablestore.access_key_secret (Required): The AccessKey secret of your Alibaba Cloud account or RAM user. For more information, see Obtain an AccessKey pair.
If you use STS to temporarily access resources, set this parameter to the AccessKey secret of the temporary access credential.
tablestore.sts_token (Optional): The security token of the temporary access credential. This parameter is required only when you use STS to temporarily access resources. For more information, see Grant permissions to a RAM user using a RAM policy.
tablestore.table.name (Required): The name of the corresponding table in Tablestore.
Query data in the table.
Run the
SELECT * FROM pet;command to query all rows in the table.The command returns the following result:
Bowser Diane dog m 1979-08-31 1995-07-29 Buffy Harold dog f 1989-05-13 NULL Chirpy Gwen bird f 1998-09-11 NULL Claws Gwen cat m 1994-03-17 NULL Fang Benny dog m 1990-08-27 NULL Fluffy Harold cat f 1993-02-04 NULL Puffball Diane hamster f 1999-03-30 NULL Slim Benny snake m 1996-04-29 NULL Whistler Gwen bird NULL 1997-12-09 NULL Time taken: 5.045 seconds, Fetched 9 row(s)Run the
SELECT * FROM pet WHERE birth > "1995-01-01";command to query rows where the value in the birth column is later than 1995-01-01.The command returns the following result:
Chirpy Gwen bird f 1998-09-11 NULL Puffball Diane hamster f 1999-03-30 NULL Slim Benny snake m 1996-04-29 NULL Whistler Gwen bird NULL 1997-12-09 NULL Time taken: 1.41 seconds, Fetched 4 row(s)
HadoopMR access example
The following example shows how to use a HadoopMR program to count the number of rows in the pet table.
Build mappers and reducers.
public class RowCounter { public static class RowCounterMapper extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> { private final static Text agg = new Text("TOTAL"); private final static LongWritable one = new LongWritable(1); @Override public void map( PrimaryKeyWritable key, RowWritable value, Context context) throws IOException, InterruptedException { context.write(agg, one); } } public static class IntSumReducer extends Reducer<Text,LongWritable,Text,LongWritable> { @Override public void reduce( Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } }The map() method of the mapper is called each time a row is read from the Tablestore data source. The PrimaryKeyWritable and RowWritable parameters correspond to the primary key and content of the row, respectively. You can call PrimaryKeyWritable.getPrimaryKey() and RowWritable.getRow() to obtain the primary key object and row object defined by the Tablestore Java SDK.
Configure Tablestore as the data source for the mapper.
private static RangeRowQueryCriteria fetchCriteria() { RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName"); res.setMaxVersions(1); List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>(); List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>(); lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN)); upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX)); res.setInclusiveStartPrimaryKey(new PrimaryKey(lower)); res.setExclusiveEndPrimaryKey(new PrimaryKey(upper)); return res; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "row count"); job.addFileToClassPath(new Path("hadoop-connector.jar")); job.setJarByClass(RowCounter.class); job.setMapperClass(RowCounterMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TableStoreInputFormat.class); TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/"); TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret"); TableStoreInputFormat.addCriteria(job, fetchCriteria()); FileOutputFormat.setOutputPath(job, new Path("output")); System.exit(job.waitForCompletion(true) ? 0 : 1); }In the example, job.setInputFormatClass(TableStoreInputFormat.class) is used to set Tablestore as the data source. You also need to:
Deploy the hadoop-connector.jar file to the cluster and add it to the classpath. The addFileToClassPath() method specifies the local path of the hadoop-connector.jar file. This example assumes that the hadoop-connector.jar file is in the current path.
Specify the endpoint and credentials to access Tablestore. Use TableStoreInputFormat.setEndpoint() and TableStoreInputFormat.setCredential() to set the required endpoint and AccessKey information.
Specify the table to count.
NoteEach time you call addCriteria(), a RangeRowQueryCriteria object that is defined by the Java SDK is added to the data source. You can call addCriteria() multiple times. The RangeRowQueryCriteria object has the same conditions as the RangeRowQueryCriteria object used by the GetRange interface of the Tablestore Java SDK.
Use the setFilter() and addColumnsToGet() methods of RangeRowQueryCriteria to filter out unnecessary rows and columns on the Tablestore server. This reduces the amount of data accessed, which lowers costs and improves performance.
You can perform a union operation on multiple tables by adding multiple RangeRowQueryCriteria objects that correspond to the tables.
You can achieve more even chunking by adding multiple RangeRowQueryCriteria objects for the same table. The Tablestore-Hadoop connector chunks the range that you specify based on specific policies.
Program execution example
Set HADOOP_CLASSPATH.
HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jarRun the
find output -type fcommand to find all files in the output folder.The command returns the following result:
output/_SUCCESS output/part-r-00000 output/._SUCCESS.crc output/.part-r-00000.crcRun the
cat output/part-r-00000command to view the row count.TOTAL 9
Type conversion notes
The data types supported by Tablestore are not exactly the same as those supported by Hive or Spark.
The following table shows how Tablestore data types (rows) are converted to Hive or Spark data types (columns).
Type conversion | TINYINT | SMALLINT | INT | BIGINT | FLOAT | DOUBLE | BOOLEAN | STRING | BINARY |
INTEGER | Supported, with precision loss | Supported, with precision loss | Supported, with precision loss | Support | Supported, with precision loss | Supported, with precision loss | Not supported | Not supported | Not supported |
DOUBLE | Supported, with precision loss | Supported, with precision loss | Supported, with precision loss | Supported, with precision loss | Supported, with precision loss | Support | Not supported | Not supported | Not supported |
BOOLEAN | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Supported | Not supported | Not supported |
STRING | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Supported | Not supported |
BINARY | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Support |