全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

多任务示例

更新时间:2017-10-17 19:46:02

测试准备

  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 MultiJobs 测试表和资源。

    • 创建测试表。

      1. create table mr_empty (key string, value string);
      2. create table mr_multijobs_out (value bigint);
    • 添加测试资源。

      1. add table mr_multijobs_out as multijobs_res_table -f;
      2. add jar data\resources\mapreduce-examples.jar -f;

测试步骤

在 odpscmd 中执行 MultiJobs,如下所示:

  1. jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;

预期结果

作业成功结束后,输出表 mr_multijobs_out 中的内容,如下所示:

  1. +------------+
  2. | value |
  3. +------------+
  4. | 0 |
  5. +------------+

代码示例

  1. package com.aliyun.odps.mapred.open.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.data.Record;
  5. import com.aliyun.odps.data.TableInfo;
  6. import com.aliyun.odps.mapred.JobClient;
  7. import com.aliyun.odps.mapred.MapperBase;
  8. import com.aliyun.odps.mapred.RunningJob;
  9. import com.aliyun.odps.mapred.TaskContext;
  10. import com.aliyun.odps.mapred.conf.JobConf;
  11. import com.aliyun.odps.mapred.utils.InputUtils;
  12. import com.aliyun.odps.mapred.utils.OutputUtils;
  13. import com.aliyun.odps.mapred.utils.SchemaUtils;
  14. /**
  15. * MultiJobs
  16. *
  17. * Running multiple job
  18. *
  19. **/
  20. public class MultiJobs {
  21. public static class InitMapper extends MapperBase {
  22. @Override
  23. public void setup(TaskContext context) throws IOException {
  24. Record record = context.createOutputRecord();
  25. long v = context.getJobConf().getLong("multijobs.value", 2);
  26. record.set(0, v);
  27. context.write(record);
  28. }
  29. }
  30. public static class DecreaseMapper extends MapperBase {
  31. @Override
  32. public void cleanup(TaskContext context) throws IOException {
  33. //从JobConf中获取main函数中定义的变量值
  34. long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
  35. long v = -1;
  36. int count = 0;
  37. Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
  38. while (iter.hasNext()) {
  39. Record r = iter.next();
  40. v = (Long) r.get(0);
  41. if (expect != v) {
  42. throw new IOException("expect: " + expect + ", but: " + v);
  43. }
  44. count++;
  45. }
  46. if (count != 1) {
  47. throw new IOException("res_table should have 1 record, but: " + count);
  48. }
  49. Record record = context.createOutputRecord();
  50. v--;
  51. record.set(0, v);
  52. context.write(record);
  53. context.getCounter("multijobs", "value").setValue(v);
  54. }
  55. }
  56. public static void main(String[] args) throws Exception {
  57. if (args.length != 1) {
  58. System.err.println("Usage: TestMultiJobs <table>");
  59. System.exit(1);
  60. }
  61. String tbl = args[0];
  62. long iterCount = 2;
  63. System.err.println("Start to run init job.");
  64. JobConf initJob = new JobConf();
  65. initJob.setLong("multijobs.value", iterCount);
  66. initJob.setMapperClass(InitMapper.class);
  67. InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
  68. OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
  69. initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
  70. initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
  71. initJob.setNumReduceTasks(0);
  72. JobClient.runJob(initJob);
  73. while (true) {
  74. System.err.println("Start to run iter job, count: " + iterCount);
  75. JobConf decJob = new JobConf();
  76. decJob.setLong("multijobs.expect.value", iterCount);
  77. decJob.setMapperClass(DecreaseMapper.class);
  78. InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
  79. OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
  80. decJob.setNumReduceTasks(0);
  81. RunningJob rJob = JobClient.runJob(decJob);
  82. iterCount--;
  83. if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
  84. break;
  85. }
  86. }
  87. if (iterCount != 0) {
  88. throw new IOException("Job failed.");
  89. }
  90. }
  91. }
本文导读目录