全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
MaxCompute

开源兼容MapReduce

更新时间:2018-04-29 23:58:47

MaxCompute(原ODPS)有一套原生的MapReduce编程模型和接口,简单来说,这套接口的输入输出都是MaxCompute中的Table,处理的数据以Record为组织形式,它可以很好地描述Table中的数据处理过程。

但是与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR中执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MapReduce代码,便可在MaxCompute平台上运行,将会比较理想。

现在MaxCompute平台提供了一个Hadoop MapReduce到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MapReduce作业的二进制级别的兼容,即您可以在不改代码的情况下通过指定一些配置,便可将原来在Hadoop上运行的MapReduce Jar包拿过来直接运行在MaxCompute上。您可通过此处下载开发插件。目前该插件处于测试阶段,暂时还不能支持您自定义comparator和自定义key类型。

下文将以WordCount程序为例,为您介绍HadoopMR插件的基本使用方式。

注意

下载HadoopMR插件

请单击此处下载插件,包名为hadoop2openmr-1.0.jar。

注意

这个Jar包中已经包含hadoop-2.7.2版本的相关依赖,在作业的Jar包中请不要携带Hadoop的依赖,避免版本冲突。

准备Jar包

编译导出WordCount的Jar包(wordcount_test.jar),WordCount程序的源码如下所示:

  1. package com.aliyun.odps.mapred.example.hadoop;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. import java.util.StringTokenizer;
  13. public class WordCount {
  14. public static class TokenizerMapper
  15. extends Mapper<Object, Text, Text, IntWritable>{
  16. private final static IntWritable one = new IntWritable(1);
  17. private Text word = new Text();
  18. public void map(Object key, Text value, Context context
  19. ) throws IOException, InterruptedException {
  20. StringTokenizer itr = new StringTokenizer(value.toString());
  21. while (itr.hasMoreTokens()) {
  22. word.set(itr.nextToken());
  23. context.write(word, one);
  24. }
  25. }
  26. }
  27. public static class IntSumReducer
  28. extends Reducer<Text,IntWritable,Text,IntWritable> {
  29. private IntWritable result = new IntWritable();
  30. public void reduce(Text key, Iterable<IntWritable> values,
  31. Context context
  32. ) throws IOException, InterruptedException {
  33. int sum = 0;
  34. for (IntWritable val : values) {
  35. sum += val.get();
  36. }
  37. result.set(sum);
  38. context.write(key, result);
  39. }
  40. }
  41. public static void main(String[] args) throws Exception {
  42. Configuration conf = new Configuration();
  43. Job job = Job.getInstance(conf, "word count");
  44. job.setJarByClass(WordCount.class);
  45. job.setMapperClass(TokenizerMapper.class);
  46. job.setCombinerClass(IntSumReducer.class);
  47. job.setReducerClass(IntSumReducer.class);
  48. job.setOutputKeyClass(Text.class);
  49. job.setOutputValueClass(IntWritable.class);
  50. FileInputFormat.addInputPath(job, new Path(args[0]));
  51. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  52. System.exit(job.waitForCompletion(true) ? 0 : 1);
  53. }
  54. }

准备测试数据

  1. 创建输入表和输出表。

    1. create table if not exists wc_in(line string);
    2. create table if not exists wc_out(key string, cnt bigint);
  2. 通过Tunnel命令将数据导入输入表中。

    需要导入文本文件data.txt的数据内容如下:

    1. hello maxcompute
    2. hello mapreduce

    您可通过MaxCompute客户端Tunnel命令将data.txt的数据导入wc_in中,如下所示:

    1. tunnel upload data.txt wc_in;

准备好表与HDFS文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

  1. {
  2. "file:/foo": {
  3. "resolver": {
  4. "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
  5. "properties": {
  6. "text.resolver.columns.combine.enable": "true",
  7. "text.resolver.seperator": "\t"
  8. }
  9. },
  10. "tableInfos": [
  11. {
  12. "tblName": "wc_in",
  13. "partSpec": {},
  14. "label": "__default__"
  15. }
  16. ],
  17. "matchMode": "exact"
  18. },
  19. "file:/bar": {
  20. "resolver": {
  21. "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
  22. "properties": {
  23. "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
  24. "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
  25. }
  26. },
  27. "tableInfos": [
  28. {
  29. "tblName": "wc_out",
  30. "partSpec": {},
  31. "label": "__default__"
  32. }
  33. ],
  34. "matchMode": "fuzzy"
  35. }
  36. }

配置项说明:

整个配置是一个JSON文件,描述HDFS上的文件与MaxCompute上的表之间的映射关系,一般要配置输入和输出两部分,一个HDFS路径对应一个resolver配置,tableInfos配置以及matchMode配置。

  • resolver:用于配置如何对待文件中的数据,目前有 com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver两个内置的resolver可以选用。除了指定好resolver的名字,还需要为相应的resolver配置一些properties指导它正确的进行数据解析。

    • TextFileResolver:对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入resolver配置时,需要配置的properties有:text.resolver.columns.combine.enable和text.resolver.seperator,当text.resolver.columns.combine.enable配置为true时,会把输入表的所有列按照text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为key,value。

    • BinaryFileResolver:可以处理二进制的数据,自动将数据转换为MaxCompute可以支持的数据类型,如:Bigint,Bool,Double等。当作为输出resolver配置时,需要配置的properties有:binary.resolver.input.key.class和binary.resolver.input.value.class,分别代表中间结果的key和value类型。

  • tableInfos:您配置HDFS对应的MaxCompute表,目前只支持配置表的名字tblName,而partSpec和label请保持和示例一致。

  • matchMode:路径的匹配模式,可选项为exact和fuzzy,分别代表精确匹配和模糊匹配,如果设置为fuzzy,则可以通过正则来匹配HDFS的输入路径。

作业提交

使用MaxCompute命令行工具odpscmd提交作业。MaxCompute命令行工具的安装和配置方法请参见客户端用户手册。在odpscmd运行如下命令:

  1. jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;

注意

  • wordcount-table-res.conf是配置了/foo /bar的映射。

  • wordcount_test.jar包是您的Hadoop MapReduce的jar包。

  • com.aliyun.odps.mapred.example.hadoop.WordCount是您要运行的作业类名。

  • /foo /bar是指在HDFS上的路径,在JSON配置文件中映射成了wc_in和wc_out。

  • 配置好映射后,您需要通过datax或DataWorks数据集成功能,手动导入Hadoop HDFS的输入文件到wc_in进行MR计算,并手动导出结果数据wc_out到您的HDFS输出目录/bar。

  • 此处假设已经将hadoop2openmr-1.0.jar、wordcount_test.jar和wordcount-table-res.conf放置到odpscmd的当前目录,否则在指定配置和-classpath的路径时需要做相应的修改。

运行过程如下图所示:

open source job run

当作业运行完成后,便可查看结果表wc_out的内容,验证作业是否成功结束,结果是否符合预期。

open source result

本文导读目录