MaxCompute不提供Graph开发插件,您可以基于Eclipse开发MaxCompute Graph程序。

开发流程如下:
  1. 编写Graph代码,使用本地调试进行基本的测试。
  2. 进行集群调试,验证结果。

开发示例

本节以SSSP算法为例,为您介绍如何使用Eclipse开发和调试Graph程序。

操作步骤:
  1. 创建Java工程,工程名为graph_examples
  2. 将MaxCompute客户端lib目录下的Jar包加到Eclipse工程的Java Build Path中。
  3. 开发MaxCompute Graph程序。

    实际开发过程中,通常会先复制一个例子(例如单源最短距离),然后进行修改。在本示例中,仅修改了Package的路径为package com.aliyun.odps.graph.example

  4. 编译打包。

    在Eclipse环境中,右键单击源代码目录(图中的src目录),Export > Java > JAR file生成Jar包,选择目标Jar包的保存路径,例如D:\\odps\\clt\\odps-graph-example-sssp.jar

  5. 使用MaxCompute客户端运行SSSP,详情请参见运行Graph

本地调试

MaxCompute Graph支持本地调试模式,您可以使用Eclipse进行断点调试。

操作步骤:
  1. 下载一个odps-graph-local的Maven包。
  2. 选择Eclipse工程,右键单击Graph作业主程序(包含main函数)文件,配置其运行参数(Run As > Run Configurations)。
  3. Arguments页签中,设置Program arguments参数为1 sssp_in sssp_out,作为主程序的输入参数。
  4. Arguments页签中,设置VM arguments参数如下。
    -Dodps.runner.mode=local
    -Dodps.project.name=<project.name>
    -Dodps.end.point=<end.point>
    -Dodps.access.id=<access.id> 
    -Dodps.access.key=<access.key>
    参数配置
  5. 对于本地模式(即不指定odps.end.point参数),需要在warehouse创建sssp_insssp_out表,为输入表sssp_in添加数据,输入数据如下所示。
    1,"2:2,3:1,4:4"
    2,"1:2,3:2,4:1"
    3,"1:1,2:2,5:1"
    4,"1:4,2:1,5:1"
    5,"3:1,4:1"

    关于warehouse的介绍请参见本地运行

  6. 单击Run,即可本地跑SSSP。
    说明 参数设置可参见MaxCompute客户端中conf/odps_config.ini的设置,上述为几个常用参数,其他参数说明如下:
    • odps.runner.mode:取值为local,本地调试功能必须指定。
    • odps.project.name:指定当前Project,必须指定。
    • odps.end.point:指定当前MaxCompute服务的地址,可以不指定。如果不指定,只从warehouse读取表或资源的meta和数据,不存在则抛异常。如果指定,会先从warehouse读取,不存在时会远程连接MaxCompute读取。
    • odps.access.id:连接MaxCompute服务的AccessKey ID,只在指定odps.end.point时有效。
    • odps.access.key:连接MaxCompute服务的AccessKey Secret,只在指定odps.end.point时有效。
    • odps.cache.resources:指定使用的资源列表,效果与Jar命令的-resources相同。
    • odps.local.warehouse:本地warehouse路径,不指定时默认为./warehouse
    在Eclipse中本地运行SSSP的调试输出信息,如下所示。
    Counters: 3
             com.aliyun.odps.graph.local.COUNTER
                     TASK_INPUT_BYTE=211
                     TASK_INPUT_RECORD=5
                     TASK_OUTPUT_BYTE=161
                     TASK_OUTPUT_RECORD=5
     graph task finish
    说明 上述示例中,需要本地warehouse下有sssp_insssp_out表。sssp_insssp_out的详细信息请参见编写Graph(可选)

本地作业临时目录

每运行一次本地调试,都会在Eclipse工程目录下新建一个临时目录。
本地运行的Graph作业,临时目录包括以下几个目录和文件:
  • counters:存放作业运行的一些计数信息。
  • inputs:存放作业的输入数据,优先取自本地的warehouse,如果本地没有,会通过MaxCompute SDK从服务端读取(如果设置了odps.end.point),默认一个input只读10条数据,可以通过-Dodps.mapred.local.record.limit参数进行修改,但是也不能超过1万条记录。
  • outputs:存放作业的输出数据,如果本地warehouse中存在输出表,outputs中的结果数据在作业执行完后会覆盖本地warehouse中对应的表。
  • resources:存放作业使用的资源,与输入类似,优先取自本地的warehouse,如果本地没有,会通过MaxCompute SDK从服务端读取(如果设置了odps.end.point)。
  • job.xml:作业配置。
  • superstep:存放每一轮迭代的持久化信息。
说明 如果需要本地调试时输出详细日志,需要在src目录下放一个log4j的配置文件log4j.properties_odps_graph_cluster_debug

集群调试

通过本地的调试后,您即可提交作业到集群进行测试。

操作步骤如下:
  1. 配置MaxCompute客户端。
  2. 使用add jar /path/work.jar -f;命令更新Jar包。
  3. 使用Jar命令运行作业,查看运行日志和结果数据。
说明 集群运行Graph的详细介绍请参见编写Graph(可选)

性能调优

对性能有所影响的Graph Job配置项如下:
  • setSplitSize(long):输入表切分大小,单位MB,大于0,默认64。
  • setNumWorkers(int):设置作业Worker数量,范围为[1,1000],默认值为1。Worker数由作业输入字节数和splitSize决定。
  • setWorkerCPU(int):Map CPU资源,100为1CPU核,[50,800]之间,默认值为200。
  • setWorkerMemory(int):Map内存资源,单位MB,范围为[256,12288],默认值为4096。
  • setMaxIteration(int):设置最大迭代次数,默认为-1,小于或等于0时表示最大迭代次数不作为作业终止条件。
  • setJobPriority(int):设置作业优先级,范围为[0,9],默认值为9,数值越大优先级越小。
通常情况下的调优建议:
  • 可以考虑使用setNumWorkers方法增加Worker数目。
  • 可以考虑使用setSplitSize方法减少切分大小,提高作业载入数据速度。
  • 加大Worker的CPU或内存。
  • 设置最大迭代次数,如果有些应用结果精度要求不高,可以考虑减少迭代次数,尽快结束。
接口setNumWorkerssetSplitSize配合使用,可以提高数据的载入速度。假设setNumWorkersworkerNumsetSplitSizesplitSize,总输入字节数为inputSize,则输入被切分后的块数splitNum=inputSize/splitSizeworkerNumsplitNum之间的关系如下:
  • 情况一:splitNum=workerNum,每个Worker负责载入1个Split。
  • 情况二:splitNum>workerNum,每个Worker负责载入1个或多个Split。
  • 情况三:splitNum<workerNum,每个Worker负责载入0个或1一个Split。

因此,应调节workerNumsplitSize,在满足前两种情况时,数据载入比较快。迭代阶段只需调节workerNum。如果设置runtime partitioning为False,则建议直接使用setSplitSize控制Worker数量,或者保证满足前两种情况。当出现第三种情况时,部分Worker上的切片会为0,可以在Jar命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; setNumWorkers,和setSplitSize等效。

另外一种常见的性能问题为数据倾斜,反应到Counters就是某些Worker处理的点或边数量远超过其他Worker。数据倾斜的原因通常是某些Key对应的点、边,或者消息的数量远超过其他Key,这些Key被分到少量的Worker处理,导致这些Worker运行时间较长。解决方法:
  • 尝试Combiner,将这些Key对应点的消息进行本地聚合,减少消息发生。

    开发人员可定义Combiner以减少存储消息的内存和网络数据流量,缩短作业的执行时间。

  • 改进业务逻辑。
    数据量大时,读取磁盘中的数据可能耗费一部分处理时间。减少需要读取的数据字节数可以提高总体的吞吐量,提高作业性能。您可通过以下方法进行改进:
    • 减少输入数据量:对某些决策性质的应用,处理数据采样后子集所得到的结果只可能影响结果的精度,而并不会影响整体的准确性,因此可以考虑先对数据进行特定采样后再导入输入表中进行处理。
    • 避免读取用不到的字段:MaxCompute Graph框架的TableInfo类支持读取指定的列(以列名数组方式传入),而非整个表或表分区,这样也可以减少输入的数据量,提高作业性能。

内置Jar包

以下Jar包会默认加载到运行Graph程序的JVM中,您不必上传这些资源,也不必在命令行的-libjars带上这些Jar包。
  • commons-codec-1.3.jar
  • commons-io-2.0.1.jar
  • commons-lang-2.5.jar
  • commons-logging-1.0.4.jar
  • commons-logging-api-1.0.4.jar
  • guava-14.0.jar
  • json.jar
  • log4j-1.2.15.jar
  • slf4j-api-1.4.3.jar
  • slf4j-log4j12-1.4.3.jar
  • xmlenc-0.52.jar
说明 在JVM的Classpath中,上述内置Jar包会位于您的Jar包之前,可能产生版本冲突。例如您的程序中使用了commons-codec-1.5.jar某个类的函数,但是这个函数不在commons-codec-1.3.jar中,这时如果1.3版本无法实现您的需求,则只能等待MaxCompute升级新版本。