全部产品
MaxCompute

原生SDK介绍

更新时间:2017-06-07 13:26:11   分享:   

在本小节,我们仅会对较为常用的MapReduce核心接口做简短介绍。使用Maven的用户可以从Maven库中搜索”odps-sdk-mapred”获取不同版本的Java SDK,相关配置信息:

  1. <dependency>
  2. <groupId>com.aliyun.odps</groupId>
  3. <artifactId>odps-sdk-mapred</artifactId>
  4. <version>0.26.2-public</version>
  5. </dependency>
主要接口 描述
MapperBase 用户自定义的Map函数需要继承自此类。处理输入表的记录对 象,加工处理成键值对集合输出到Reduce阶段,或者不经过 Reduce阶段直接输出结果记录到结果表。不经过Reduce阶段而 直接输出计算结果的作业,也可称之为Map-Only作业。
ReducerBase 用户自定义的Reduce函数需要继承自此类。对与一个键(Key) 关联的一组数值集(Values)进行归约计算。
TaskContext 是MapperBase及ReducerBase多个成员函数的输入参数之一。 含有任务运行的上下文信息。
JobClient 用于提交和管理作业,提交方式包括阻塞(同步)方式及非阻塞 (异步)方式。
RunningJob 作业运行时对象,用于跟踪运行中的MapReduce作业实例。
JobConf 描述一个MapReduce任务的配置,通常在主程序(main函数)中 定义JobConf对象,然后通过JobClient提交作业给ODPS服务。

MapperBase

主要函数接口:

主要接口 描述
void cleanup(TaskContext context) 在Map阶段结束时,map方法之后调用。
void map(long key, Record record, TaskContext context) map方法,处理输入表的记录。
void setup(TaskContext context) 在Map阶段开始时,map方法之前调用。

ReducerBase

主要函数接口:

主要接口 描述
void cleanup( TaskContext context) 在Reduce阶段结束时,reduce方法之后调用。
void reduce(Record key, Iterator<Record > values, TaskContext context) reduce方法,处理输入表的记录。
void setup( TaskContext context) 在Reduce阶段开始时,reduce方法之前调用。

TaskContext

主要函数接口:

主要接口 描述
TableInfo[] getOutputTableInfo() 获取输出的表信息
Record createOutputRecord() 创建默认输出表的记录对象
Record createOutputRecord(String label) 创建给定label输出表的记录对象
Record createMapOutputKeyRecord() 创建Map输出Key的记录对象
Record createMapOutputValueRecord() 创建Map输出Value的记录对象
void write(Record record) 写记录到默认输出,用于Reduce端写出数据, 可以在Reduce端多次调用。
void write(Record record, String label) 写记录到给定标签输出,用于Reduce端写出数据。可以在 Reduce端多次调用。
void write(Record key, Record value) Map写记录到中间结果,可以在Map函数中多次调用。 可以在Map端多次调用。
BufferedInputStream readResourceFileAsStream(String resourceName) 读取文件类型资源
Iterator<Record > readResourceTable(String resourceName) 读取表类型资源
Counter getCounter(Enum<? > name) 获取给定名称的Counter对象
Counter getCounter(String group, String name) 获取给定组名和名称的Counter对象
void progress() 向MapReduce框架报告心跳信息。 如果用户方法处理时间 很长,且中间没有调用框架,可以调用这个方法避免task 超时,框架默认600秒超时。

备注:ODPS的TaskContext接口中提供了progress功能,但此功能是防止Worker长时间运行未结束,被框架误认为超时而被杀的情况出现。 这个接口更类似于向框架发送心跳信息,并不是用来汇报Worker进度。ODPS MapReduce默认Worker超时时间为10分钟(系统默认配置,不受用户控制), 如果超过10分钟,Worker仍然没有向框架发送心跳(调用progress接口),框架会强制停止该Worker,MapReduce任务失败退出。因此, 建议用户在Mapper/Reducer函数中,定期调用progress接口,防止框架认为Worker超时,误杀任务。

JobConf

主要函数接口:

主要接口 描述
void setResources(String resourceNames) 声明本作业使用的资源。只有声明的资源才能在运行 Mapper/Reducer时通过TaskContext对象读取。
void setMapOutputKeySchema(Column[] schema) 设置Mapper输出到Reducer的Key属性
void setMapOutputValueSchema(Column[] schema) 设置Mapper输出到Reducer的Value属性
void setOutputKeySortColumns(String[] cols) 设置Mapper输出到Reducer的Key排序列
void setOutputGroupingColumns(String[] cols) 设置Key分组列
void setMapperClass(Class<? extends Mapper > theClass) 设置作业的Mapper函数
void setPartitionColumns(String[] cols) 设置作业指定的分区列. 默认是Mapper输出Key的所有列
void setReducerClass(Class<? extends Reducer > theClass) 设置作业的Reducer
void setCombinerClass(Class<? extends Reducer > theClass) 设置作业的combiner。在Map端运行,作用类似于单个Map 对本地的相同Key值做Reduce
void setSplitSize(long size) 设置输入分片大小,单位 MB,默认256
void setNumReduceTasks(int n) 设置Reducer任务数,默认为Mapper任务数的1/4
void setMemoryForMapTask(int mem) 设置Mapper任务中单个Worker的内存大小,单位:MB, 默认值2048。
void setMemoryForReduceTask(int mem) 设置Reducer任务中单个Worker的内存大小,单位:MB, 默认值 2048。

备注:

  • 通常情况下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
  • 在Map端,Mapper输出的Record会根据设置的PartitionColumns计算哈希值,决定分配到哪个Reducer,会根据KeySortColumns对Record进行排序。
  • 在Reduce端,输入Records在按照KeySortColumns排序好后,会根据GroupingColumns指定的列对输入的Records进行分组,即会顺序遍历输入的Records,把GroupingColumns所指定列相同的Records作为一次reduce函数调用的输入。

JobClient

主要函数接口:

主要接口 描述
static RunningJob runJob(JobConf job) 阻塞(同步)方式提交MapReduce作业后立即返回
static RunningJob submitJob(JobConf job) 非阻塞(异步)方式提交MapReduce作业后立即返回

RunningJob

主要函数接口:

主要接口 描述
String getInstanceID() 获取作业运行实例ID,用于查看运行日志和作业管理。
boolean isComplete() 查询作业是否结束。
boolean isSuccessful() 查询作业实例是否运行成功。
void waitForCompletion() 等待直至作业实例结束。一般使用于异步方式提交的作业。
JobStatus getJobStatus() 查询作业实例运行状态。
void killJob() 结束此作业。
Counters getCounters() 获取Conter信息。

InputUtils

主要函数接口:

主要接口 描述
static void addTable(TableInfo table, JobConf conf) 添加表table到任务输入,可以被调用多次 ,新加入的表以append方式添加到输入队列中。
static void setTables(TableInfo [] tables, JobConf conf) 添加多张表到任务输入中。

OutputUtils

主要函数接口:

主要接口 描述
static void addTable(TableInfo table, JobConf conf) 添加表table到任务输出,可以被调用多次 ,新加入的表以append方式添加到输出队 列中。
static void setTables(TableInfo [] tables, JobConf conf) 添加多张表到任务输出中。

Pipeline

Pipeline是MR2的主体类。可以通过Pipeline.builder构建一个Pipeline。Pipeline的主要接口如下:

  1. public Builder addMapper(Class<? extends Mapper> mapper)
  2. public Builder addMapper(Class<? extends Mapper> mapper,
  3. Column[] keySchema, Column[] valueSchema, String[] sortCols,
  4. SortOrder[] order, String[] partCols,
  5. Class<? extends Partitioner> theClass, String[] groupCols)
  6. public Builder addReducer(Class<? extends Reducer> reducer)
  7. public Builder addReducer(Class<? extends Reducer> reducer,
  8. Column[] keySchema, Column[] valueSchema, String[] sortCols,
  9. SortOrder[] order, String[] partCols,
  10. Class<? extends Partitioner> theClass, String[] groupCols)
  11. public Builder setOutputKeySchema(Column[] keySchema)
  12. public Builder setOutputValueSchema(Column[] valueSchema)
  13. public Builder setOutputKeySortColumns(String[] sortCols)
  14. public Builder setOutputKeySortOrder(SortOrder[] order)
  15. public Builder setPartitionColumns(String[] partCols)
  16. public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
  17. public Builder setOutputGroupingColumns(String[] cols)

使用示例:

  1. Job job = new Job();
  2. Pipeline pipeline = Pipeline.builder()
  3. .addMapper(TokenizerMapper.class)
  4. .setOutputKeySchema(
  5. new Column[] { new Column("word", OdpsType.STRING) })
  6. .setOutputValueSchema(
  7. new Column[] { new Column("count", OdpsType.BIGINT) })
  8. .addReducer(SumReducer.class)
  9. .setOutputKeySchema(
  10. new Column[] { new Column("count", OdpsType.BIGINT) })
  11. .setOutputValueSchema(
  12. new Column[] { new Column("word", OdpsType.STRING),
  13. new Column("count", OdpsType.BIGINT) })
  14. .addReducer(IdentityReducer.class).createPipeline();
  15. job.setPipeline(pipeline);
  16. job.addInput(...)
  17. job.addOutput(...)
  18. job.submit();

如上所示,用户可以在main函数中构建一个Map后连续接两个Reduce的MapReduce任务。如果用户比较熟悉MapReduce的基础功能,可以轻松的使用MR2

我们也建议用户在使用MR2功能之前,先了解MapReduce的基础用法。

当然,JobConf 仅能够配置Map后接单Reduce的MapReduce任务。

数据类型

MapReduce支持的数据类型有:bigint, string, double, boolean, datetime以及decimal类型。ODPS数据类型与Java类型的对应关系如下:

ODPS SQL Type Bigint String Double Boolean Datetime Decimal
Java Type Long String Double Boolean Date BigDecimal
本文导读目录
本文导读目录
以上内容是否对您有帮助?