全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

Pipeline示例

更新时间:2017-10-17 14:18:36

测试准备

  1. 准备好测试程序的 Jar包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 Pipeline 的测试表和资源。

    • 创建测试表。

      1. create table wc_in (key string, value string);
      2. create table wc_out(key string, cnt bigint);
    • 添加测试资源。

      1. add jar data\resources\mapreduce-examples.jar -f;
  3. 使用 tunnel 导入数据。

    1. tunnel upload data wc_in;

    导入 wc_in 表的数据文件 data 的内容,如下所示:

    1. hello,odps

测试步骤

在 odpscmd 中执行 WordCountPipeline,如下所示:

  1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.WordCountPipeline wc_in wc_out;

预期结果

作业成功结束后,输出表 wc_out 中的内容,如下所示:

  1. +------------+------------+
  2. | key | cnt |
  3. +------------+------------+
  4. | hello | 1 |
  5. | odps | 1 |
  6. +------------+------------+

代码示例

  1. package com.aliyun.odps.mapred.open.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.Column;
  5. import com.aliyun.odps.OdpsException;
  6. import com.aliyun.odps.OdpsType;
  7. import com.aliyun.odps.data.Record;
  8. import com.aliyun.odps.data.TableInfo;
  9. import com.aliyun.odps.mapred.Job;
  10. import com.aliyun.odps.mapred.MapperBase;
  11. import com.aliyun.odps.mapred.ReducerBase;
  12. import com.aliyun.odps.pipeline.Pipeline;
  13. public class WordCountPipelineTest {
  14. public static class TokenizerMapper extends MapperBase {
  15. Record word;
  16. Record one;
  17. @Override
  18. public void setup(TaskContext context) throws IOException {
  19. word = context.createMapOutputKeyRecord();
  20. one = context.createMapOutputValueRecord();
  21. one.setBigint(0, 1L);
  22. }
  23. @Override
  24. public void map(long recordNum, Record record, TaskContext context)
  25. throws IOException {
  26. for (int i = 0; i < record.getColumnCount(); i++) {
  27. String[] words = record.get(i).toString().split("\\s+");
  28. for (String w : words) {
  29. word.setString(0, w);
  30. context.write(word, one);
  31. }
  32. }
  33. }
  34. }
  35. public static class SumReducer extends ReducerBase {
  36. private Record value;
  37. @Override
  38. public void setup(TaskContext context) throws IOException {
  39. value = context.createOutputValueRecord();
  40. }
  41. @Override
  42. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  43. throws IOException {
  44. long count = 0;
  45. while (values.hasNext()) {
  46. Record val = values.next();
  47. count += (Long) val.get(0);
  48. }
  49. value.set(0, count);
  50. context.write(key, value);
  51. }
  52. }
  53. public static class IdentityReducer extends ReducerBase {
  54. private Record result;
  55. @Override
  56. public void setup(TaskContext context) throws IOException {
  57. result = context.createOutputRecord();
  58. }
  59. @Override
  60. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  61. throws IOException {
  62. while (values.hasNext()) {
  63. result.set(0, key.get(0));
  64. result.set(1, values.next().get(0));
  65. context.write(result);
  66. }
  67. }
  68. }
  69. public static void main(String[] args) throws OdpsException {
  70. if (args.length != 2) {
  71. System.err.println("Usage: WordCountPipeline <in_table> <out_table>");
  72. System.exit(2);
  73. }
  74. Job job = new Job();
  75. /***
  76. * 构造Pipeline的过程中,如果不指定Mapper的OutputKeySortColumns,PartitionColumns,OutputGroupingColumns,
  77. * 框架会默认使用其OutputKey作为此三者的默认配置
  78. ***/
  79. Pipeline pipeline = Pipeline.builder()
  80. .addMapper(TokenizerMapper.class)
  81. .setOutputKeySchema(
  82. new Column[] { new Column("word", OdpsType.STRING) })
  83. .setOutputValueSchema(
  84. new Column[] { new Column("count", OdpsType.BIGINT) })
  85. .setOutputKeySortColumns(new String[] { "word" })
  86. .setPartitionColumns(new String[] { "word" })
  87. .setOutputGroupingColumns(new String[] { "word" })
  88. .addReducer(SumReducer.class)
  89. .setOutputKeySchema(
  90. new Column[] { new Column("word", OdpsType.STRING) })
  91. .setOutputValueSchema(
  92. new Column[] { new Column("count", OdpsType.BIGINT)})
  93. .addReducer(IdentityReducer.class).createPipeline();
  94. job.setPipeline(pipeline);
  95. job.addInput(TableInfo.builder().tableName(args[0]).build());
  96. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  97. job.submit();
  98. job.waitForCompletion();
  99. System.exit(job.isSuccessful() == true ? 0 : 1);
  100. }
  101. }
本文导读目录