全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

Unique示例

更新时间:2017-10-17 14:16:01

测试准备

  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 Unique 的测试表和资源。

    • 创建测试表。

      1. create table ss_in(key bigint, value bigint);
      2. create table ss_out(key bigint, value bigint);
    • 添加测试资源。

      1. add jar data\resources\mapreduce-examples.jar -f;
  3. 使用 tunnel 导入数据。

    1. tunnel upload data ss_in;

    导入 ss_in 表的数据文件 data 的内容,如下所示:

    1. 1,1
    2. 1,1
    3. 2,2
    4. 2,2

测试步骤

在 odpscmd 中执行 Unique,如下所示:

  1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.Unique ss_in ss_out key;

预期结果

作业成功结束后,输出表 ss_out 中的内容,如下所示:

  1. +------------+------------+
  2. | key | value |
  3. +------------+------------+
  4. | 1 | 1 |
  5. | 2 | 2 |
  6. +------------+------------+

代码示例

  1. package com.aliyun.odps.mapred.open.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.data.Record;
  5. import com.aliyun.odps.data.TableInfo;
  6. import com.aliyun.odps.mapred.JobClient;
  7. import com.aliyun.odps.mapred.MapperBase;
  8. import com.aliyun.odps.mapred.ReducerBase;
  9. import com.aliyun.odps.mapred.TaskContext;
  10. import com.aliyun.odps.mapred.conf.JobConf;
  11. import com.aliyun.odps.mapred.utils.InputUtils;
  12. import com.aliyun.odps.mapred.utils.OutputUtils;
  13. import com.aliyun.odps.mapred.utils.SchemaUtils;
  14. /**
  15. * Unique Remove duplicate words
  16. *
  17. **/
  18. public class Unique {
  19. public static class OutputSchemaMapper extends MapperBase {
  20. private Record key;
  21. private Record value;
  22. @Override
  23. public void setup(TaskContext context) throws IOException {
  24. key = context.createMapOutputKeyRecord();
  25. value = context.createMapOutputValueRecord();
  26. }
  27. @Override
  28. public void map(long recordNum, Record record, TaskContext context)
  29. throws IOException {
  30. long left = 0;
  31. long right = 0;
  32. if (record.getColumnCount() > 0) {
  33. left = (Long) record.get(0);
  34. if (record.getColumnCount() > 1) {
  35. right = (Long) record.get(1);
  36. }
  37. key.set(new Object[] { (Long) left, (Long) right });
  38. value.set(new Object[] { (Long) left, (Long) right });
  39. context.write(key, value);
  40. }
  41. }
  42. }
  43. public static class OutputSchemaReducer extends ReducerBase {
  44. private Record result = null;
  45. @Override
  46. public void setup(TaskContext context) throws IOException {
  47. result = context.createOutputRecord();
  48. }
  49. @Override
  50. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  51. throws IOException {
  52. result.set(0, key.get(0));
  53. while (values.hasNext()) {
  54. Record value = values.next();
  55. result.set(1, value.get(1));
  56. }
  57. context.write(result);
  58. }
  59. }
  60. public static void main(String[] args) throws Exception {
  61. if (args.length > 3 || args.length < 2) {
  62. System.err.println("Usage: unique <in> <out> [key|value|all]");
  63. System.exit(2);
  64. }
  65. String ops = "all";
  66. if (args.length == 3) {
  67. ops = args[2];
  68. }
  69. // Key Unique
  70. if (ops.equals("key")) {
  71. JobConf job = new JobConf();
  72. job.setMapperClass(OutputSchemaMapper.class);
  73. job.setReducerClass(OutputSchemaReducer.class);
  74. job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,value:bigint"));
  75. job.setMapOutputValueSchema(SchemaUtils.fromString("key:bigint,value:bigint"));
  76. job.setPartitionColumns(new String[] { "key" });
  77. job.setOutputKeySortColumns(new String[] { "key", "value" });
  78. job.setOutputGroupingColumns(new String[] { "key" });
  79. job.set("tablename2", args[1]);
  80. job.setNumReduceTasks(1);
  81. job.setInt("table.counter", 0);
  82. InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
  83. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
  84. JobClient.runJob(job);
  85. }
  86. // Key&Value Unique
  87. if (ops.equals("all")) {
  88. JobConf job = new JobConf();
  89. job.setMapperClass(OutputSchemaMapper.class);
  90. job.setReducerClass(OutputSchemaReducer.class);
  91. job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,value:bigint"));
  92. job.setMapOutputValueSchema(SchemaUtils.fromString("key:bigint,value:bigint"));
  93. job.setPartitionColumns(new String[] { "key" });
  94. job.setOutputKeySortColumns(new String[] { "key", "value" });
  95. job.setOutputGroupingColumns(new String[] { "key", "value" });
  96. job.set("tablename2", args[1]);
  97. job.setNumReduceTasks(1);
  98. job.setInt("table.counter", 0);
  99. InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
  100. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
  101. JobClient.runJob(job);
  102. }
  103. // Value Unique
  104. if (ops.equals("value")) {
  105. JobConf job = new JobConf();
  106. job.setMapperClass(OutputSchemaMapper.class);
  107. job.setReducerClass(OutputSchemaReducer.class);
  108. job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,value:bigint"));
  109. job.setMapOutputValueSchema(SchemaUtils.fromString("key:bigint,value:bigint"));
  110. job.setPartitionColumns(new String[] { "value" });
  111. job.setOutputKeySortColumns(new String[] { "value" });
  112. job.setOutputGroupingColumns(new String[] { "value" });
  113. job.set("tablename2", args[1]);
  114. job.setNumReduceTasks(1);
  115. job.setInt("table.counter", 0);
  116. InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
  117. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
  118. JobClient.runJob(job);
  119. }
  120. }
  121. }
本文导读目录