本文向您介绍如何使用Eclipse开发和运行MapReduce程序。

运行MapReduce示例程序

  1. 选择ODPS项目中的WordCount示例。
  2. 右键单击WordCount.java,选择Run As > ODPS MapReduce|Graph
  3. ODPS MapReduce|Graph Run Configuration页面,选择example_project,单击Finish
  4. 运行成功后,会出现结果提示。

运行自定义MapReduce程序

  1. 创建自定义Mapper。
    1. 右键单击src目录,选择New > Mapper
    2. New Mapper页面中Name处输入Mapper类的名字,单击Finish
    3. 在左侧资源管理器(Package Explorer)中,src目录下生成文件UserMapper.java。该文件的内容即是一个Mapper类的模板,如下所示。
      package odps;
      import java.io.IOException;
      import com.aliyun.odps.data.Record;
      import com.aliyun.odps.mapred.MapperBase;
      public class UserMapper extends MapperBase {
          @Override
          public void setup(TaskContext context) throws IOException {
          }
          @Override
          public void map(long recordNum, Record record, TaskContext context)
                  throws IOException {
          }
          @Override
          public void cleanup(TaskContext context) throws IOException {
          }
      }
      您可以根据自己的需求进行修改。例如修改为如下内容。
      package odps;
      import java.io.IOException;
      import com.aliyun.odps.counter.Counter;
      import com.aliyun.odps.data.Record;
      import com.aliyun.odps.mapred.MapperBase;
      public class UserMapper extends MapperBase {
          Record word;
          Record one;
          Counter gCnt;
          @Override
          public void setup(TaskContext context) throws IOException {
                word = context.createMapOutputKeyRecord();
                one = context.createMapOutputValueRecord();
                one.set(new Object[] { 1L });
                gCnt = context.getCounter("MyCounters", "global_counts");
          }
          @Override
          public void map(long recordNum, Record record, TaskContext context)
                  throws IOException {
                for (int i = 0; i < record.getColumnCount(); i++) {
                    String[] words = record.get(i).toString().split("\\s+");
                    for (String w : words) {
                      word.set(new Object[] { w });
                      Counter cnt = context.getCounter("MyCounters", "map_outputs");
                      cnt.increment(1);
                      gCnt.increment(1);
                      context.write(word, one);
                    }
                  }
                }
          @Override
          public void cleanup(TaskContext context) throws IOException {
          }
      }
  2. 创建自定义Reduce。
    1. 右键单击src目录,选择New > Reduce
    2. New Reducer页面中Name处输入Reducer类的名字,单击Finish
    3. 在左侧资源管理器(Package Explorer)中,src目录下生成文件UserReduce.java。该文件的内容即是一个Reduce类的模板,如下所示。
      package odps;
      import java.io.IOException;
      import java.util.Iterator;
      import com.aliyun.odps.data.Record;
      import com.aliyun.odps.mapred.ReducerBase;
      
      public class UserReduce extends ReducerBase {
      
          @Override
          public void setup(TaskContext context) throws IOException {
          }
      
          @Override
          public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
      
              while (values.hasNext()) {
                  values.next();
                  // TODO process value
              }
          }
      
          @Override
          public void cleanup(TaskContext context) throws IOException {
          }
      
      }
      您可以根据自己的需求进行修改。例如修改为如下内容。
      package odps;
      import java.io.IOException;
      import java.util.Iterator;
      import com.aliyun.odps.counter.Counter;
      import com.aliyun.odps.data.Record;
      import com.aliyun.odps.mapred.ReducerBase;
      public class UserReduce extends ReducerBase {
          private Record result;
          Counter gCnt;
          @Override
          public void setup(TaskContext context) throws IOException {
                result = context.createOutputRecord();
                gCnt = context.getCounter("MyCounters", "global_counts");
          }
          @Override
          public void reduce(Record key, Iterator<Record> values, TaskContext context)
                  throws IOException {
                long count = 0;
                while (values.hasNext()) {
                  Record val = values.next();
                  count += (Long) val.get(0);
                }
                result.set(0, key.get(0));
                result.set(1, count);
                Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
                cnt.increment(1);
                gCnt.increment(1);
                context.write(result);
              }
          @Override
          public void cleanup(TaskContext context) throws IOException {
          }
      }
  3. 创建main函数。
    1. 右键单击src目录,选择New > MapReduce Driver
    2. New MapReducer Driver页面中配置相关参数,然后单击Finish
      • Name:MapReduce Driver的名称。此例为UserDriver
      • Mapper:Mapper类名称。此例为UserMapper
      • Reducer:Reducer类名称。此例为UserReducer
    3. 在左侧资源管理器(Package Explorer)中,src目录下生成文件UserDriver.java。该文件的内容即是一个MapReduce Driver类的模板,如下所示。
      package odps;
      
      import com.aliyun.odps.OdpsException;
      import com.aliyun.odps.data.TableInfo;
      import com.aliyun.odps.mapred.JobClient;
      import com.aliyun.odps.mapred.RunningJob;
      import com.aliyun.odps.mapred.conf.JobConf;
      import com.aliyun.odps.mapred.utils.InputUtils;
      import com.aliyun.odps.mapred.utils.OutputUtils;
      import com.aliyun.odps.mapred.utils.SchemaUtils;
      
      public class UserDriver {
      
          public static void main(String[] args) throws OdpsException {
              JobConf job = new JobConf();
      
              // TODO: specify map output types
              job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
              job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
      
              // TODO: specify input and output tables
              InputUtils.addTable(TableInfo.builder().tableName("wc_in1").build(), job);
              OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
      
              job.setMapperClass(UserMapper.class);
              job.setReducerClass(UserReduce.class);
      
              RunningJob rj = JobClient.runJob(job);
              rj.waitForCompletion();
          }
      
      }
      							
      您可以根据自己的需求进行修改。例如修改为如下内容。
      package odps;
      import com.aliyun.odps.OdpsException;
      import com.aliyun.odps.data.TableInfo;
      import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
      import com.aliyun.odps.examples.mr.WordCount.SumReducer;
      import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
      import com.aliyun.odps.mapred.JobClient;
      import com.aliyun.odps.mapred.RunningJob;
      import com.aliyun.odps.mapred.conf.JobConf;
      import com.aliyun.odps.mapred.utils.InputUtils;
      import com.aliyun.odps.mapred.utils.OutputUtils;
      import com.aliyun.odps.mapred.utils.SchemaUtils;
      public class UserDriver {
          public static void main(String[] args) throws OdpsException {
              JobConf job = new JobConf();
              job.setMapperClass(TokenizerMapper.class);
              job.setCombinerClass(SumCombiner.class);
              job.setReducerClass(SumReducer.class);
              job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
              job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
              InputUtils.addTable(
                  TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
              InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
              OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
              RunningJob rj = JobClient.runJob(job);
              rj.waitForCompletion();
          }
      }
  4. 右键单击UserDriver.java,选择Run As > ODPS MapReduce|Graph
  5. ODPS MapReduce|Graph Run Configuration页面,选择example_project,单击Finish
  6. 运行成功后,会出现结果提示。
  7. 查看运行结果。运行结果保存在warehouse > wc_out目录下的R_000000文件中,打开此文件即可查看结果。

打包MapReduce自定义程序

通过本地调试,确定输出结果正确后,可以通过Eclipse导出(Export)功能将MapReduce打包。打包后将Jar包上传到MaxCompute中。在分布式环境MaxCompute下执行MapReduce,详情请参见编写MapReduce
  1. 右键单击src目录,选择Export,如下图。
  2. 选择类型为JAR file,单击Next
  3. 仅需要导出src目录下package(com.aliyun.odps.mapred.open.example),Jar File名称指定为mr-examples.jar,然后单击Next
  4. 单击Finish完成打包。