全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

开源兼容MapReduce

更新时间:2017-10-17 11:32:38

MaxCompute(原 ODPS)有一套原生的 MapReduce 编程模型和接口,简单来说,这套接口的输入输出都是 MaxCompute 中的 Table,处理的数据是以 Record 为组织形式的,它可以很好地描述 Table 中的数据处理过程。但是与社区的 Hadoop 相比,编程接口差异较大。Hadoop 用户如果要将原来的 Hadoop MR 作业迁移到 MaxCompute 的 MR 中执行,需要重写 MR 的代码,使用 MaxCompute 的接口进行编译和调试,运行正常后再打成一个 Jar 包才能放到 MaxCompute 的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的 Hadoop MapReduce 代码,便可在 MaxCompute 平台上运行,将会比较理想。

现在 MaxCompute 平台提供了一个 Hadoop MapReduce 到 MaxCompute MR 的适配工具,已经在一定程度上实现了 Hadoop MapReduce 作业的二进制级别的兼容,即您可以在不改代码的情况下通过指定一些配置,便可将原来在 Hadoop 上运行的 MapReduce Jar 包拿过来直接运行在 MaxCompute 上。您可通过 此处 下载开发插件。目前该插件处于测试阶段,暂时还不能支持您自定义 comparator 和自定义 key 类型。

下面将以 WordCount 程序为例,为您介绍 HadoopMR 插件的基本使用方式。

注意

下载 HadoopMR 插件

请单击 此处,下载插件,包名为 hadoop2openmr-1.0.jar。

注意

这个 Jar 包中已经包含 hadoop-2.7.2 版本的相关依赖,在作业的 Jar 包中请不要携带 Hadoop 的依赖,避免版本冲突。

准备 Jar 包

编译导出 WordCount 的 Jar 包:wordcount_test.jar,WordCount 程序的源码如下:

  1. package com.aliyun.odps.mapred.example.hadoop;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. import java.util.StringTokenizer;
  13. public class WordCount {
  14. public static class TokenizerMapper
  15. extends Mapper<Object, Text, Text, IntWritable>{
  16. private final static IntWritable one = new IntWritable(1);
  17. private Text word = new Text();
  18. public void map(Object key, Text value, Context context
  19. ) throws IOException, InterruptedException {
  20. StringTokenizer itr = new StringTokenizer(value.toString());
  21. while (itr.hasMoreTokens()) {
  22. word.set(itr.nextToken());
  23. context.write(word, one);
  24. }
  25. }
  26. }
  27. public static class IntSumReducer
  28. extends Reducer<Text,IntWritable,Text,IntWritable> {
  29. private IntWritable result = new IntWritable();
  30. public void reduce(Text key, Iterable<IntWritable> values,
  31. Context context
  32. ) throws IOException, InterruptedException {
  33. int sum = 0;
  34. for (IntWritable val : values) {
  35. sum += val.get();
  36. }
  37. result.set(sum);
  38. context.write(key, result);
  39. }
  40. }
  41. public static void main(String[] args) throws Exception {
  42. Configuration conf = new Configuration();
  43. Job job = Job.getInstance(conf, "word count");
  44. job.setJarByClass(WordCount.class);
  45. job.setMapperClass(TokenizerMapper.class);
  46. job.setCombinerClass(IntSumReducer.class);
  47. job.setReducerClass(IntSumReducer.class);
  48. job.setOutputKeyClass(Text.class);
  49. job.setOutputValueClass(IntWritable.class);
  50. FileInputFormat.addInputPath(job, new Path(args[0]));
  51. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  52. System.exit(job.waitForCompletion(true) ? 0 : 1);
  53. }
  54. }

准备测试数据

  1. 创建输入表和输出表。

    1. create table if not exists wc_in(line string);
    2. create table if not exists wc_out(key string, cnt bigint);
  2. 通过 Tunnel 命令将数据导入输入表中。

    需要导入文本文件 data.txt 的数据内容如下:

    1. hello maxcompute
    2. hello mapreduce

    您可通过 MaxCompute 客户端Tunnel 命令将 data.txt 的数据导入 wc_in 中,如下所示:

    1. tunnel upload data.txt wc_in;

准备好表与 HDFS 文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

  1. {
  2. "file:/foo": {
  3. "resolver": {
  4. "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
  5. "properties": {
  6. "text.resolver.columns.combine.enable": "true",
  7. "text.resolver.seperator": "\t"
  8. }
  9. },
  10. "tableInfos": [
  11. {
  12. "tblName": "wc_in",
  13. "partSpec": {},
  14. "label": "__default__"
  15. }
  16. ],
  17. "matchMode": "exact"
  18. },
  19. "file:/bar": {
  20. "resolver": {
  21. "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
  22. "properties": {
  23. "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
  24. "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
  25. }
  26. },
  27. "tableInfos": [
  28. {
  29. "tblName": "wc_out",
  30. "partSpec": {},
  31. "label": "__default__"
  32. }
  33. ],
  34. "matchMode": "fuzzy"
  35. }
  36. }

配置项说明:

整个配置是一个 Json 文件,描述 HDFS 上的文件与 MaxCompute 上的表之间的映射关系,一般要配置输入和输出两部分,一个 HDFS 路径对应一个 resolver 配置,tableInfos 配置以及 matchMode 配置。

  • resolver:用于配置如何对待文件中的数据,目前有 com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver 和 com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver 两个内置的 resolver 可以选用。除了指定好 resolver 的名字,还需要为相应的 resolver 配置一些 properties 指导它正确的进行数据解析。

    • TextFileResolver:对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入 resolver 配置时,需要配置的 properties 有:text.resolver.columns.combine.enable 和 text.resolver.seperator,当text.resolver.columns.combine.enable 配置为 true 时,会把输入表的所有列按找 text.resolver.seperator 指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为 key,value。

    • BinaryFileResolver:可以处理二进制的数据,自动将数据转换为 MaxCompute 可以支持的数据类型,如:Bigint,Bool,Double 等。当作为输出 resolver 配置时,需要配置的 properties 有:binary.resolver.input.key.class 和 binary.resolver.input.value.class,分别代表中间结果的 key 和 value 类型。

  • tableInfos:您配置 HDFS 对应的 MaxCompute 的表,目前只支持配置表的名字 tblName,而 partSpec 和 label 请保持和示例一致。

  • matchMode:路径的匹配模式,可选项为 exact 和 fuzzy,分别代表精确匹配和模糊匹配,如果设置为 fuzzy,则可以通过正则来匹配 HDFS 的输入路径。

作业提交

使用 MaxCompute 命令行工具 odpscmd 提交作业。MaxCompute 命令行工具的安装和配置方法请参见 客户端用户手册。在 odpscmd 下运行如下命令:

  1. jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;

这里假设已经将 hadoop2openmr-1.0.jar、wordcount_test.jar 和 wordcount-table-res.conf 放置到 odpscmd 的当前目录,否则在指定配置和 -classpath 的路径时需要做相应的修改。

运行过程如下图所示:

open source job run

当作业运行完成后,便可查看结果表 wc_out 的内容,验证作业成功结束,结果符合预期。

open source result

本文导读目录