Sort example

更新时间:
复制 MD 格式

The Sort program uses MapReduce to sort input records by key in ascending order. This example walks you through setting up the tables and data, running the job, and verifying the output.

This approach routes all data through a single reducer for global sorting. It works well for small datasets. For large datasets, use TeraSort instead.

Prerequisites

Before you begin, ensure that you have:

Prepare tables and data

  1. Create the input and output tables.

    CREATE TABLE ss_in(key BIGINT, value BIGINT);
    CREATE TABLE ss_out(key BIGINT, value BIGINT);
  2. Add the JAR package as a MaxCompute resource. The JAR package mapreduce-examples.jar is located in the bin\data\resources directory of your MaxCompute client installation.

    If this is the first time you're adding this JAR package, omit the -f flag.
    add jar data\resources\mapreduce-examples.jar -f;
  3. Import data.txt from the bin directory of the MaxCompute client into the ss_in table using Tunnel.

    tunnel upload data.txt ss_in;

    The ss_in table now contains:

    2,1
    1,1
    3,1

Run the Sort job

On the MaxCompute client, run the following command:

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.Sort ss_in ss_out;

Verify the result

After the job completes, query the ss_out table. The records are sorted in ascending order by key:

+------------+------------+
| key        | value      |
+------------+------------+
| 1          | 1          |
| 2          | 1          |
| 3          | 1          |
+------------+------------+

How it works

The Sort program uses an identity mapper and an identity reducer — neither transforms the data; the sorting happens automatically as MapReduce shuffles key-value pairs between the map and reduce phases.

Map phase

IdentityMapper reads each input record and emits the first column as the map output key and the second column as the map output value:

Input recordMap output keyMap output value
(2, 1)21
(1, 1)11
(3, 1)31

After all mappers finish, MapReduce sorts the key-value pairs by key before passing them to the reducer.

Reduce phase

IdentityReducer writes each key and its associated values directly to the output table without modification. Because numReduceTasks is set to 1, all sorted data flows through a single reducer, producing a globally sorted result.

Important

Setting numReduceTasks to 1 transfers all data to a single reducer. This is suitable only for small datasets. For large datasets, use TeraSort.

Sample code

For Project Object Model (POM) dependency configuration, see Precautions.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
     * This is the trivial map/reduce program that does absolutely nothing other
     * than use the framework to fragment and sort the input values.
     *
     **/
public class Sort {
    static int printUsage() {
        System.out.println("sort <input> <output>");
        return -1;
    }
    /**
       * Implements the identity function, mapping record's first two columns to
       * outputs.
       **/
    public static class IdentityMapper extends MapperBase {
        private Record key;
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            key.set(new Object[] { (Long) record.get(0) });
            value.set(new Object[] { (Long) record.get(1) });
            context.write(key, value);
        }
    }

    public static class IdentityReducer extends ReducerBase {
        private Record result = null;

        @Override
        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }

        /**
         * Writes all keys and values directly to output.
         */
        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
                throws IOException {
            result.set(0, key.get(0));

            while (values.hasNext()) {
                Record val = values.next();
                result.set(1, val.get(0));
                context.write(result);
            }
        }
    }

    /**
       * The main driver for sort program. Invoke this method to submit the
       * map/reduce job.
       *
       * @throws IOException
       *           When there is communication problems with the job tracker.
       **/
    public static void main(String[] args) throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setMapperClass(IdentityMapper.class);
        jobConf.setReducerClass(IdentityReducer.class);
        /** For global sorting, the number of reducers is set to 1. All the data is transferred to the same reducer. */
        /** This method applies only to the scenarios when small amounts of data are processed. If large amounts of data need to be processed, use other methods, such as TeraSort. */
        jobConf.setNumReduceTasks(1);
        jobConf.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
        jobConf.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), jobConf);
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), jobConf);
        Date startTime = new Date();
        System.out.println("Job started: " + startTime);
        JobClient.runJob(jobConf);
        Date end_time = new Date();
        System.out.println("Job ended: " + end_time);
        System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
    }
}