The Sort program uses MapReduce to sort input records by key in ascending order. This example walks you through setting up the tables and data, running the job, and verifying the output.
This approach routes all data through a single reducer for global sorting. It works well for small datasets. For large datasets, use TeraSort instead.
Prerequisites
Before you begin, ensure that you have:
Completed the environment setup described in Getting started
Prepare tables and data
Create the input and output tables.
CREATE TABLE ss_in(key BIGINT, value BIGINT); CREATE TABLE ss_out(key BIGINT, value BIGINT);Add the JAR package as a MaxCompute resource. The JAR package
mapreduce-examples.jaris located in thebin\data\resourcesdirectory of your MaxCompute client installation.If this is the first time you're adding this JAR package, omit the
-fflag.add jar data\resources\mapreduce-examples.jar -f;Import
data.txtfrom thebindirectory of the MaxCompute client into thess_intable using Tunnel.tunnel upload data.txt ss_in;The
ss_intable now contains:2,1 1,1 3,1
Run the Sort job
On the MaxCompute client, run the following command:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.Sort ss_in ss_out;Verify the result
After the job completes, query the ss_out table. The records are sorted in ascending order by key:
+------------+------------+
| key | value |
+------------+------------+
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
+------------+------------+How it works
The Sort program uses an identity mapper and an identity reducer — neither transforms the data; the sorting happens automatically as MapReduce shuffles key-value pairs between the map and reduce phases.
Map phase
IdentityMapper reads each input record and emits the first column as the map output key and the second column as the map output value:
| Input record | Map output key | Map output value |
|---|---|---|
(2, 1) | 2 | 1 |
(1, 1) | 1 | 1 |
(3, 1) | 3 | 1 |
After all mappers finish, MapReduce sorts the key-value pairs by key before passing them to the reducer.
Reduce phase
IdentityReducer writes each key and its associated values directly to the output table without modification. Because numReduceTasks is set to 1, all sorted data flows through a single reducer, producing a globally sorted result.
Setting numReduceTasks to 1 transfers all data to a single reducer. This is suitable only for small datasets. For large datasets, use TeraSort.
Sample code
For Project Object Model (POM) dependency configuration, see Precautions.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* This is the trivial map/reduce program that does absolutely nothing other
* than use the framework to fragment and sort the input values.
*
**/
public class Sort {
static int printUsage() {
System.out.println("sort <input> <output>");
return -1;
}
/**
* Implements the identity function, mapping record's first two columns to
* outputs.
**/
public static class IdentityMapper extends MapperBase {
private Record key;
private Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
key.set(new Object[] { (Long) record.get(0) });
value.set(new Object[] { (Long) record.get(1) });
context.write(key, value);
}
}
public static class IdentityReducer extends ReducerBase {
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
/**
* Writes all keys and values directly to output.
*/
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
result.set(0, key.get(0));
while (values.hasNext()) {
Record val = values.next();
result.set(1, val.get(0));
context.write(result);
}
}
}
/**
* The main driver for sort program. Invoke this method to submit the
* map/reduce job.
*
* @throws IOException
* When there is communication problems with the job tracker.
**/
public static void main(String[] args) throws Exception {
JobConf jobConf = new JobConf();
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
/** For global sorting, the number of reducers is set to 1. All the data is transferred to the same reducer. */
/** This method applies only to the scenarios when small amounts of data are processed. If large amounts of data need to be processed, use other methods, such as TeraSort. */
jobConf.setNumReduceTasks(1);
jobConf.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
jobConf.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), jobConf);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), jobConf);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
}
}