全部产品
MaxCompute

图模型功能介绍

更新时间:2017-06-07 13:26:11   分享:   

运行作业

MaxCompute 客户端提供一个Jar命令用于运行 MaxCompute GRAPH作业,其使用方式与 MapReduce中的Jar命令 相同,这里仅作简要介绍:

  1. Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
  2. -conf <configuration_file> Specify an application configuration file
  3. -classpath <local_file_list> classpaths used to run mainClass
  4. -D <name>=<value> Property value pair, which will be used to run mainClass
  5. -local Run job in local mode
  6. -resources <resource_name_list> file/table resources used in graph, seperate by comma

其中 < GENERIC_OPTIONS>包括(均为可选参数):

  • -conf <configuration file > :指定JobConf配置文件;
  • -classpath <local_file_list > : 本地执行时的classpath,主要用于指定main函数所在的jar包。大多数情况下,用户更习惯于将main函数与Graph作业编写在一个包中,例如:单源最短距离算法 ,因此,在执行示例程序时,-resources及-classpath的参数中都出现了用户的jar包,但二者意义不同,-resources引用的是Graph作业,运行于分布式环境中,而-classpath引用的是main函数,运行于本地,指定的jar包路径也是本地文件路径。包名之间使用系统默认的文件分割符作分割(通常情况下,windows系统是分号”;”,linux系统是冒号”:”);
  • -D <prop_name > = < prop_value > : 本地执行时,<mainClass > 的java属性,可以定义多个;
  • -local:以本地模式执行Graph作业,主要用于程序调试;
  • -resources <resource_name_list > : Graph作业运行时使用的资源声明。一般情况下,resource_name_list中需要指定Graph作业所在的资源名称。如果用户在Graph作业中读取了其他ODPS资源,那么,这些资源名称也需要被添加到resource_name_list中。资源之间使用逗号分隔,使用跨项目空间使用资源时,需要前面加上:PROJECT_NAME/resources/,示例:-resources otherproject/resources/resfile;

同时,用户也可以直接运行GRAPH作业的main函数直接将作业提交到 MaxCompute ,而不是通过 MaxCompute 客户端提交作业。以PageRank算法 为例:

  1. public static void main(String[] args) throws Exception {
  2. if (args.length < 2)
  3. printUsage();
  4. Account account = new AliyunAccount(accessId, accessKey);
  5. Odps odps = new Odps(account);
  6. odps.setEndpoint(endPoint);
  7. odps.setDefaultProject(project);
  8. SessionState ss = SessionState.get();
  9. ss.setOdps(odps);
  10. ss.setLocalRun(false);
  11. String resource = "mapreduce-examples.jar";
  12. GraphJob job = new GraphJob();
  13. // 将使用的jar及其他文件添加到class cache resource,对应于jar命令中 -libjars 中指定的资源
  14. job.addCacheResourcesToClassPath(resource);
  15. job.setGraphLoaderClass(PageRankVertexReader.class);
  16. job.setVertexClass(PageRankVertex.class);
  17. job.addInput(TableInfo.builder().tableName(args[0]).build());
  18. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  19. // default max iteration is 30
  20. job.setMaxIteration(30);
  21. if (args.length >= 3)
  22. job.setMaxIteration(Integer.parseInt(args[2]));
  23. long startTime = System.currentTimeMillis();
  24. job.run();
  25. System.out.println("Job Finished in "
  26. + (System.currentTimeMillis() - startTime) / 1000.0
  27. + " seconds");
  28. }

输入输出

MaxCompute GRAPH作业的输入输出限制为表,不允许用户自定义输入输出格式。

定义作业输入,支持多路输入:

  1. GraphJob job = new GraphJob();
  2. job.addInput(TableInfo.builder().tableName(“tblname”).build()); //表作为输入
  3. job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build()); //分区作为输入
  4. //只读取输入表的 col2 和 col0 列,在 GraphLoader 的 load 方法中,record.get(0) 得到的是col2列,顺序一致
  5. job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});

备注:

  • 关于作业输入定义,更多的信息参见GraphJob的addInput相关方法说明,框架读取输入表的记录传给用户自定义的GraphLoader载入图数据;
  • 限制: 暂时不支持分区过滤条件。更多应用限制请参考 应用限制

定义作业输出,支持多路输出,通过label标识每路输出:

  1. GraphJob job = new GraphJob();
  2. //输出表为分区表时需要给到最末一级分区
  3. job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
  4. // 下面的参数 true 表示覆盖tableinfo指定的分区,即INSERT OVERWRITE语义,false表示INSERT INTO语义
  5. job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);

备注:

  • 关于作业输出定义,更多的信息参见GraphJob的addOutput 相关方法说明;
  • Graph作业在运行时可以通过WorkerContext的write方法写出记录到输出表,多路输出需要指定标识,如上面的 “output1”;
  • 更多应用限制请参考 应用限制

读取资源

GRAPH程序中添加资源

除了通过jar命令指定GRAPH读取的资源外,还可以通过GraphJob的下面两个方法指定:

  1. void addCacheResources(String resourceNames)
  2. void addCacheResourcesToClassPath(String resourceNames)

GRAPH程序中使用资源

在 GRAPH 程序中可以通过相应的上下文对象WorkerContext的下述方法读取资源:

  1. public byte[] readCacheFile(String resourceName) throws IOException;
  2. public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException;
  3. public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath)throws IOException;
  4. public Iterable<WritableRecord> readResourceTable(String resourceName);
  5. public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
  6. public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException;
  7. public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;

备注:

  • 通常在WorkerComputer的setup方法里读取资源,然后保存在Worker Value中,之后通过getWorkerValue方法取得;
  • 建议用上面的流接口,边读边处理,内存耗费少;
  • 更多应用限制请参考 应用限制
本文导读目录
本文导读目录
以上内容是否对您有帮助?