全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

分区表输入示例

更新时间:2017-10-17 14:18:07

本文将为您介绍两个把 Partition 作为输入输出的示例,仅供参考。

示例一:

  1. public static void main(String[] args) throws Exception {
  2. JobConf job = new JobConf();
  3. ...
  4. LinkedHashMap<String, String> input = new LinkedHashMap<String, String>();
  5. input.put("pt", "123456");
  6. InputUtils.addTable(TableInfo.builder().tableName("input_table").partSpec(input).build(), job);
  7. LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
  8. output.put("ds", "654321");
  9. OutputUtils.addTable(TableInfo.builder().tableName("output_table").partSpec(output).build(), job);
  10. JobClient.runJob(job);
  11. }

示例二:

  1. package com.aliyun.odps.mapred.open.example;
  2. ...
  3. public static void main(String[] args) throws Exception {
  4. if (args.length != 2) {
  5. System.err.println("Usage: WordCount <in_table> <out_table>");
  6. System.exit(2);
  7. }
  8. JobConf job = new JobConf();
  9. job.setMapperClass(TokenizerMapper.class);
  10. job.setCombinerClass(SumCombiner.class);
  11. job.setReducerClass(SumReducer.class);
  12. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  13. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  14. Account account = new AliyunAccount("my_access_id", "my_access_key");
  15. Odps odps = new Odps(account);
  16. odps.setEndpoint("odps_endpoint_url");
  17. odps.setDefaultProject("my_project");
  18. Table table = odps.tables().get(tblname);
  19. TableInfoBuilder builder = TableInfo.builder().tableName(tblname);
  20. for (Partition p : table.getPartitions()) {
  21. if (applicable(p)) {
  22. LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
  23. for (String key : p.getPartitionSpec().keys()) {
  24. partSpec.put(key, p.getPartitionSpec().get(key));
  25. }
  26. InputUtils.addTable(builder.partSpec(partSpec).build(), conf);
  27. }
  28. }
  29. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
  30. JobClient.runJob(job);
  31. }

注意

  • 这是一段使用 MaxCompute SDK 和 MapReduce SDK 组合实现 MapReduce 任务读取范围 Partitoin 的示例。

  • 此段代码不能够编译执行,仅给出了 main 函数的示例。

  • 示例中 applicable 函数是用户逻辑,用来决定该 Partition 是否符合作为该 MapReduce 作业的输入。

本文导读目录