全部产品
MaxCompute

图模型开发和调试

更新时间:2017-06-07 13:26:11   分享:   

MaxCompute 没有为用户提供 Graph 开发插件,但用户仍然可以基于Eclipse开发 MaxCompute Graph 程序,建议的开发流程是:

  • 编写 Graph 代码,使用本地调试进行基本的测试;
  • 进行集群调试,验证结果;

开发示例

本节以SSSP 算法为例讲述如何用Eclipse开发和调试Graph程序。下面是开发SSSP的步骤:

  1. 创建Java工程,例如:graph_examples;

  2. 将 MaxCompute 客户端lib目录下的jar包加到Eclipse工程的Build Path里。一个配置好的Eclipse工程如下图所示。

  3. 开发 MaxCompute Graph程序,实际开发过程中,常常会先拷贝一个例子(例如SSSP),然后再做修改。在本示例中,我们仅修改了package路径为:package com.aliyun.odps.graph.example。
  4. 编译打包,在Eclipse环境中,右键点击源代码目录(图中的src目录),Export -> Java -> JAR file 生成JAR包,选择目标jar包的保存路径,例如:D:\odps\clt\odps-graph-example-sssp.jar;
  5. 使用 MaxCompute 客户端运行SSSP,相关操作参考快速开始之运行Graph

Eclipse 配置截图:

注意:

本地调试

MaxCompute GRAPH支持本地调试模式,可以使用Eclipse进行断点调试。断点调试步骤如下:

  • 下载一个odps-graph-local的maven包。
  • 选择Eclipse工程,右键点击GRAPH作业主程序(包含main函数)文件,配置其运行参数(Run As -> Run Configurations…),如下图。
  • 在Arguments tab页中,设置Program arguments 参数为“1 sssp_in sssp_out”,作为主程序的输入参数;
  • 在Arguments tab页中,设置VM arguments参数为:-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point>-Dodps.access.id=<access.id> -Dodps.access.key=<access.key>

  • 对于本地模式(即odps.end.point参数不指定),需要在warehouse创建sssp_in,sssp_out表,为输入表 sssp_in 添加数据,输入数据如下。关于warehouse的介绍请参考MapReduce本地运行 部分;
  1. 1,"2:2,3:1,4:4"
  2. 2,"1:2,3:2,4:1"
  3. 3,"1:1,2:2,5:1"
  4. 4,"1:4,2:1,5:1"
  5. 5,"3:1,4:1"
  • 点击Run按钮即可本地跑SSSP;

其中:参数设置可参考 MaxCompute 客户端中conf/odps_config.ini的设置,上述是几个常用参数,其他参数也说明如下:

  • odps.runner.mode:取值为local,本地调试功能必须指定;
  • odps.project.name:指定当前project,必须指定;
  • odps.end.point:指定当前odps服务的地址,可以不指定,如果不指定,只从warehouse读取表或资源的meta和数据,不存在则抛异常,如果指定,会先从warehouse读取,不存在时会远程连接odps读取;
  • odps.access.id:连接odps服务的id,只在指定odps.end.point时有效;
  • odps.access.key:连接odps服务的key,只在指定odps.end.point时有效;
  • odps.cache.resources:指定使用的资源列表,效果与jar命令的“-resources”相同;
  • odps.local.warehouse: 本地warehouse路径,不指定时默认为./warehouse;

在 Eclipse 中本地跑 SSSP的调试输出信息如下:

  1. Counters: 3
  2. com.aliyun.odps.graph.local.COUNTER
  3. TASK_INPUT_BYTE=211
  4. TASK_INPUT_RECORD=5
  5. TASK_OUTPUT_BYTE=161
  6. TASK_OUTPUT_RECORD=5
  7. graph task finish

注意:在上面的示例中,需要本地warehouse下有sssp_in及sssp_out表。sssp_in及sssp_out的详细信息请参考快速开始之运行Graph中的介绍。

本地作业临时目录

每运行一次本地调试,都会在 Eclipse 工程目录下新建一个临时目录,见下图:

一个本地运行的GRAPH作业临时目录包括以下几个目录和文件:

  • counters - 存放作业运行的一些计数信息;
  • inputs - 存放作业的输入数据,优先取自本地的 warehouse,如果本地没有,会通过 MaxCompute SDK 从服务端读取(如果设置了 odps.end.point),默认一个 input 只读10 条数据,可以通过 -Dodps.mapred.local.record.limit 参数进行修改,但是也不能超过1万条记录;
  • outputs - 存放作业的输出数据,如果本地warehouse中存在输出表,outputs里的结果数据在作业执行完后会覆盖本地warehouse中对应的表;
  • resources - 存放作业使用的资源,与输入类似,优先取自本地的warehouse,如果本地没有,会通过 MaxCompute SDK从服务端读取(如果设置了 odps.end.point);
  • job.xml - 作业配置
  • superstep - 存放每一轮迭代的消息持久化信息。

    注意:

    • 如果需要本地调试时输出详细日志,需要在 src 目录下放一个 log4j 的配置文件:log4j.properties_odps_graph_cluster_debug。

集群调试

在通过本地的调试之后,可以提交作业到集群进行测试,通常步骤:

  1. 配置 MaxCompute 客户端;
  2. 使用“add jar /path/work.jar -f;”命令更新jar包;
  3. 使用jar命令运行作业,查看运行日志和结果数据,如下所示;

注意:

性能调优

下面主要从 MaxCompute Graph 框架角度介绍常见性能优化的几个方面:

作业参数配置

对性能有所影响的 GraphJob 配置项包括:

  • setSplitSize(long) // 输入表切分大小,单位MB,大于0,默认64;
  • setNumWorkers(int) // 设置作业worker数量,范围:[1, 1000], 默认值-1, worker数由作业输入字节数和split size决定;
  • setWorkerCPU(int) // Map CPU资源,100为1cpu核,[50,800]之间,默认200;
  • setWorkerMemory(int) // Map 内存资源,单位MB,[256M,12G]之间,默认4096M;
  • setMaxIteration(int) // 设置最大迭代次数,默认 -1,小于或等于 0 时表示最大迭代次数不作为作业终止条件;
  • setJobPriority(int) // 设置作业优先级,范围:[0, 9],默认9,数值越大优先级越小。

通常情况下:

  1. 可以考虑使用setNumWorkers方法增加 worker 数目;
  2. 可以考虑使用setSplitSize方法减少切分大小,提高作业载入数据速度;
  3. 加大 worker 的 cpu 或内存;
  4. 设置最大迭代次数,有些应用如果结果精度要求不高,可以考虑减少迭代次数,尽快结束;

接口 setNumWorkers 与 setSplitSize 配合使用,可以提高数据的载入速度。假设 setNumWorkers 为 workerNum, setSplitSize 为 splitSize, 总输入字节数为 inputSize, 则输入被切分后的块数 splitNum = inputSize / splitSize,workerNum 和 splitNum 之间的关系:

  1. 若 splitNum == workerNum,每个 worker 负责载入一个 split;
  2. 若 splitNum > workerNum,每个 worker 负责载入一个或多个 split;
  3. 若 splitNum < workerNum, 每个 worker 负责载入零个或一个 split。

因此,应调节 workerNum 和 splitSize,在满足前两种情况时,数据载入比较快。迭代阶段只调节 workerNum 即可。 如果设置 runtime partitioning 为 false,则建议直接使用 setSplitSize 控制 worker 数量,或者保证满足前两种情况,在出现第三种情况时,部分 worker 上点数会为0. 可以在 jar 命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; 与 setNumWorkers 和 setSplitSize 等效。另外一种常见的性能问题:数据倾斜,反应到 Counters 就是某些 worker 处理的点或边数量远远超过其他 worker。数据倾斜的原因通常是某些 key 对应的点、边,或者消息的数量远远超出其他 key,这些 key 被分到少量的 worker 处理,从而导致这些 worker 相对于其他运行时间长很多,解决方法:

  • 可以试试 Combiner,把这些 key 对应点的消息进行本地聚合,减少消息发生;
  • 改进业务逻辑。

运用Combiner

开发人员可定义 Combiner 来减少存储消息的内存和网络数据流量,缩短作业的执行时间。细节见 SDK中Combiner的介绍。

减少数据输入量

数据量大时,读取磁盘中的数据可能耗费一部分处理时间,因此,减少需要读取的数据字节数可以提高总体的吞吐量,从而提高作业性能。可供选择的方法有如下几种:

  • 减少输入数据量:对某些决策性质的应用,处理数据采样后子集所得到的结果只可能影响结果的精度,而并不会影响整体的准确性,因此可以考虑先对数据进行特定采样后再导入输入表中进行处理
  • 避免读取用不到的字段:MaxCompute Graph 框架的 TableInfo 类支持读取指定的列(以列名数组方式传入),而非整个表或表分区,这样也可以减少输入的数据量,提高作业性能

内置jar包

下面这些 jar 包会默认加载到运行 GRAPH 程序的 JVM 中,用户可以不必上传这些资源,也不必在命令行的 -libjars 带上这些 jar 包:

  • commons-codec-1.3.jar
  • commons-io-2.0.1.jar
  • commons-lang-2.5.jar
  • commons-logging-1.0.4.jar
  • commons-logging-api-1.0.4.jar
  • guava-14.0.jar
  • json.jar
  • log4j-1.2.15.jar
  • slf4j-api-1.4.3.jar
  • slf4j-log4j12-1.4.3.jar
  • xmlenc-0.52.jar

注意:

  • 在起 JVM 的CLASSPATH 里,上述内置 jar 包会放在用户 jar 包的前面,所以可能产生版本冲突,例如:用户的程序中使用了 commons-codec-1.5.jar 某个类的函数,但是这个函数不在 commons-codec-1.3.jar 中,这时只能看 1.3 版本里是否有满足你需求的实现,或者等待 MaxCompute 升级新版本。
本文导读目录
本文导读目录
以上内容是否对您有帮助?