全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

使用Counter示例

更新时间:2017-10-17 19:46:55

本示例中定义了三个 Counter:map_outputs,reduce_outputs 和 global_counts。您可以在 Map/Reduce 的 setup,map/reduce 及 cleanup 接口中,获取任意自定义 Counter,并进行操作。

测试准备

  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 UserDefinedCounters 测试表和资源。

    • 创建测试表。

      1. create table wc_in (key string, value string);
      2. create table wc_out(key string, cnt bigint);
    • 添加测试资源。

      1. add jar data\resources\mapreduce-examples.jar -f;
  3. 使用 tunnel 导入数据。

    1. tunnel upload data wc_in;

    导入 wc_in 表的数据文件 data 的内容,如下所示:

    1. hello,odps

测试步骤

在 odpscmd 中执行 UserDefinedCounters,如下所示:

  1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.UserDefinedCounters wc_in wc_out

预期结果

作业成功结束后,可以看到 Counters 的输出,如下所示:

  1. Counters: 3
  2. com.aliyun.odps.mapred.open.example.UserDefinedCounters$MyCounter
  3. MAP_TASKS=1
  4. REDUCE_TASKS=1
  5. TOTAL_TASKS=2

输出表 wc_out 中的内容,如下所示:

  1. +------------+------------+
  2. | key | cnt |
  3. +------------+------------+
  4. | hello | 1 |
  5. | odps | 1 |
  6. +------------+------------+

代码示例

  1. package com.aliyun.odps.mapred.open.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.counter.Counter;
  5. import com.aliyun.odps.counter.Counters;
  6. import com.aliyun.odps.data.Record;
  7. import com.aliyun.odps.mapred.JobClient;
  8. import com.aliyun.odps.mapred.MapperBase;
  9. import com.aliyun.odps.mapred.ReducerBase;
  10. import com.aliyun.odps.mapred.RunningJob;
  11. import com.aliyun.odps.mapred.conf.JobConf;
  12. import com.aliyun.odps.mapred.utils.SchemaUtils;
  13. import com.aliyun.odps.mapred.utils.InputUtils;
  14. import com.aliyun.odps.mapred.utils.OutputUtils;
  15. import com.aliyun.odps.data.TableInfo;
  16. /**
  17. *
  18. * User Defined Counters
  19. *
  20. **/
  21. public class UserDefinedCounters {
  22. enum MyCounter {
  23. TOTAL_TASKS, MAP_TASKS, REDUCE_TASKS
  24. }
  25. public static class TokenizerMapper extends MapperBase {
  26. private Record word;
  27. private Record one;
  28. @Override
  29. public void setup(TaskContext context) throws IOException {
  30. super.setup(context);
  31. Counter map_tasks = context.getCounter(MyCounter.MAP_TASKS);
  32. Counter total_tasks = context.getCounter(MyCounter.TOTAL_TASKS);
  33. map_tasks.increment(1);
  34. total_tasks.increment(1);
  35. word = context.createMapOutputKeyRecord();
  36. one = context.createMapOutputValueRecord();
  37. one.set(new Object[] { 1L });
  38. }
  39. @Override
  40. public void map(long recordNum, Record record, TaskContext context)
  41. throws IOException {
  42. for (int i = 0; i < record.getColumnCount(); i++) {
  43. word.set(new Object[] { record.get(i).toString() });
  44. context.write(word, one);
  45. }
  46. }
  47. }
  48. public static class SumReducer extends ReducerBase {
  49. private Record result = null;
  50. @Override
  51. public void setup(TaskContext context) throws IOException {
  52. result = context.createOutputRecord();
  53. Counter reduce_tasks = context.getCounter(MyCounter.REDUCE_TASKS);
  54. Counter total_tasks = context.getCounter(MyCounter.TOTAL_TASKS);
  55. reduce_tasks.increment(1);
  56. total_tasks.increment(1);
  57. }
  58. @Override
  59. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  60. throws IOException {
  61. long count = 0;
  62. while (values.hasNext()) {
  63. Record val = values.next();
  64. count += (Long) val.get(0);
  65. }
  66. result.set(0, key.get(0));
  67. result.set(1, count);
  68. context.write(result);
  69. }
  70. }
  71. public static void main(String[] args) throws Exception {
  72. if (args.length != 2) {
  73. System.err
  74. .println("Usage: TestUserDefinedCounters <in_table> <out_table>");
  75. System.exit(2);
  76. }
  77. JobConf job = new JobConf();
  78. job.setMapperClass(TokenizerMapper.class);
  79. job.setReducerClass(SumReducer.class);
  80. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  81. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  82. InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
  83. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
  84. RunningJob rJob = JobClient.runJob(job);
  85. Counters counters = rJob.getCounters();
  86. long m = counters.findCounter(MyCounter.MAP_TASKS).getValue();
  87. long r = counters.findCounter(MyCounter.REDUCE_TASKS).getValue();
  88. long total = counters.findCounter(MyCounter.TOTAL_TASKS).getValue();
  89. System.exit(0);
  90. }
  91. }
本文导读目录