Unique is a MapReduce example that removes duplicate records from a MaxCompute table. It works by configuring setOutputGroupingColumns to control which fields determine uniqueness — key only, value only, or both key and value together. This example walks through setting up the tables, running the job in each mode, and understanding how the Mapper and Reducer implement deduplication.
Prerequisites
Before you begin, ensure that you have:
Completed the environment setup described in Getting started, including configuring the MaxCompute client
The
mapreduce-examples.jarJAR package built and placed atbin\data\resources\mapreduce-examples.jarin the MaxCompute client installation directory
Set up tables and resources
Create the input and output tables.
CREATE TABLE ss_in(key BIGINT, value BIGINT); CREATE TABLE ss_out(key BIGINT, value BIGINT);Add the JAR package as a resource.
Omit the
-fflag when adding the JAR package for the first time.add jar data\resources\mapreduce-examples.jar -f;Upload test data from
data.txt(located in thebindirectory of the MaxCompute client) to thess_intable using Tunnel.tunnel upload data.txt ss_in;The
ss_intable now contains the following records, including duplicates:1,1 1,1 2,2 2,2
How it works
The Unique example uses a Mapper and a Reducer together to remove duplicate records:
Mapper (
OutputSchemaMapper): Reads each record and writes both the key and value fields to the map output. All records pass through; no filtering happens here.Reducer (
OutputSchemaReducer): Receives records grouped by the columns specified insetOutputGroupingColumns. For each group, it writes only one output record — effectively deduplicating.
The deduplication behavior depends on which columns are used for grouping:
| Mode | setOutputGroupingColumns | Effect |
|---|---|---|
key | ["key"] | Keeps one record per unique key value |
all | ["key", "value"] | Keeps one record per unique key+value combination (default) |
value | ["value"] | Keeps one record per unique value |
setPartitionColumns and setOutputKeySortColumns work together to route records to the correct Reducer partition and sort them before grouping takes effect.
Run the Unique job
Run the Unique job from the MaxCompute client. The command takes the form:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Unique <input_table> <output_table> [key|value|all]The third argument selects the deduplication mode. If omitted, the mode defaults to all.
Key mode — deduplicate by key column only:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Unique ss_in ss_out key;All mode — deduplicate by both key and value (default):
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Unique ss_in ss_out all;Value mode — deduplicate by value column only:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Unique ss_in ss_out value;Expected results
All three modes produce the same output for this dataset because the key and value columns always share the same values in the test data:
+------------+------------+
| key | value |
+------------+------------+
| 1 | 1 |
| 2 | 2 |
+------------+------------+To see the difference between modes in practice, use a dataset where key and value columns have independent duplicate patterns.
Sample code
For Project Object Model (POM) dependency configuration, see the Precautions section of the Getting started guide.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* Unique Remove duplicate words
*
**/
public class Unique {
public static class OutputSchemaMapper extends MapperBase {
private Record key;
private Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
long left = 0;
long right = 0;
if (record.getColumnCount() > 0) {
left = (Long) record.get(0);
if (record.getColumnCount() > 1) {
right = (Long) record.get(1);
}
key.set(new Object[] { (Long) left, (Long) right });
value.set(new Object[] { (Long) left, (Long) right });
context.write(key, value);
}
}
}
public static class OutputSchemaReducer extends ReducerBase {
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
result.set(0, key.get(0));
while (values.hasNext()) {
Record value = values.next();
result.set(1, value.get(1));
}
context.write(result);
}
}
public static void main(String[] args) throws Exception {
if (args.length > 3 || args.length < 2) {
System.err.println("Usage: unique <in> <out> [key|value|all]");
System.exit(2);
}
String ops = "all";
if (args.length == 3) {
ops = args[2];
}
/** The input group of Reduce is determined by the value of the setOutputGroupingColumns parameter. If this parameter is not specified, the default value MapOutputKeySchema is used. */
// Key Unique
if (ops.equals("key")) {
JobConf job = new JobConf();
job.setMapperClass(OutputSchemaMapper.class);
job.setReducerClass(OutputSchemaReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,value:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("key:bigint,value:bigint"));
job.setPartitionColumns(new String[] { "key" });
job.setOutputKeySortColumns(new String[] { "key", "value" });
job.setOutputGroupingColumns(new String[] { "key" });
job.set("tablename2", args[1]);
job.setNumReduceTasks(1);
job.setInt("table.counter", 0);
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}
// Key&Value Unique
if (ops.equals("all")) {
JobConf job = new JobConf();
job.setMapperClass(OutputSchemaMapper.class);
job.setReducerClass(OutputSchemaReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,value:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("key:bigint,value:bigint"));
job.setPartitionColumns(new String[] { "key" });
job.setOutputKeySortColumns(new String[] { "key", "value" });
job.setOutputGroupingColumns(new String[] { "key", "value" });
job.set("tablename2", args[1]);
job.setNumReduceTasks(1);
job.setInt("table.counter", 0);
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}
// Value Unique
if (ops.equals("value")) {
JobConf job = new JobConf();
job.setMapperClass(OutputSchemaMapper.class);
job.setReducerClass(OutputSchemaReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,value:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("key:bigint,value:bigint"));
job.setPartitionColumns(new String[] { "value" });
job.setOutputKeySortColumns(new String[] { "value" });
job.setOutputGroupingColumns(new String[] { "value" });
job.set("tablename2", args[1]);
job.setNumReduceTasks(1);
job.setInt("table.counter", 0);
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}
}
}