MaxCompute不提供Graph开发插件,您可以基于Eclipse开发MaxCompute Graph程序。
- 编写Graph代码,使用本地调试进行基本的测试。
- 进行集群调试,验证结果。
开发示例
本节以SSSP算法为例,为您介绍如何使用Eclipse开发和调试Graph程序。
- 创建Java工程,工程名为graph_examples。
- 将MaxCompute客户端lib目录下的Jar包加到Eclipse工程的Java Build Path中。
- 开发MaxCompute Graph程序。
实际开发过程中,通常会先复制一个例子(例如单源最短距离),然后进行修改。在本示例中,仅修改了Package的路径为package com.aliyun.odps.graph.example。
- 编译打包。
在Eclipse环境中,右键单击源代码目录(图中的src目录), 生成Jar包,选择目标Jar包的保存路径,例如D:\\odps\\clt\\odps-graph-example-sssp.jar。
- 使用MaxCompute客户端运行SSSP,详情请参见运行Graph。
本地调试
MaxCompute Graph支持本地调试模式,您可以使用Eclipse进行断点调试。
- 下载一个odps-graph-local的Maven包。
- 选择Eclipse工程,右键单击Graph作业主程序(包含
main
函数)文件,配置其运行参数( )。 - 在Arguments页签中,设置Program arguments参数为1 sssp_in sssp_out,作为主程序的输入参数。
- 在Arguments页签中,设置VM arguments参数如下。
-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point> -Dodps.access.id=<access.id> -Dodps.access.key=<access.key>
- 对于本地模式(即不指定odps.end.point参数),需要在warehouse创建sssp_in、sssp_out表,为输入表sssp_in添加数据,输入数据如下所示。
1,"2:2,3:1,4:4" 2,"1:2,3:2,4:1" 3,"1:1,2:2,5:1" 4,"1:4,2:1,5:1" 5,"3:1,4:1"
关于warehouse的介绍请参见本地运行。
- 单击Run,即可本地跑SSSP。
说明 参数设置可参见MaxCompute客户端中conf/odps_config.ini的设置,上述为几个常用参数,其他参数说明如下:
- odps.runner.mode:取值为local,本地调试功能必须指定。
- odps.project.name:指定当前Project,必须指定。
- odps.end.point:指定当前MaxCompute服务的地址,可以不指定。如果不指定,只从warehouse读取表或资源的meta和数据,不存在则抛异常。如果指定,会先从warehouse读取,不存在时会远程连接MaxCompute读取。
- odps.access.id:连接MaxCompute服务的AccessKey ID,只在指定odps.end.point时有效。
- odps.access.key:连接MaxCompute服务的AccessKey Secret,只在指定odps.end.point时有效。
- odps.cache.resources:指定使用的资源列表,效果与Jar命令的
-resources
相同。 - odps.local.warehouse:本地warehouse路径,不指定时默认为./warehouse。
在Eclipse中本地运行SSSP的调试输出信息,如下所示。Counters: 3 com.aliyun.odps.graph.local.COUNTER TASK_INPUT_BYTE=211 TASK_INPUT_RECORD=5 TASK_OUTPUT_BYTE=161 TASK_OUTPUT_RECORD=5 graph task finish
本地作业临时目录
- counters:存放作业运行的一些计数信息。
- inputs:存放作业的输入数据,优先取自本地的warehouse,如果本地没有,会通过MaxCompute SDK从服务端读取(如果设置了odps.end.point),默认一个input只读10条数据,可以通过
-Dodps.mapred.local.record.limit
参数进行修改,但是也不能超过1万条记录。 - outputs:存放作业的输出数据,如果本地warehouse中存在输出表,outputs中的结果数据在作业执行完后会覆盖本地warehouse中对应的表。
- resources:存放作业使用的资源,与输入类似,优先取自本地的warehouse,如果本地没有,会通过MaxCompute SDK从服务端读取(如果设置了odps.end.point)。
- job.xml:作业配置。
- superstep:存放每一轮迭代的持久化信息。
集群调试
通过本地的调试后,您即可提交作业到集群进行测试。
- 配置MaxCompute客户端。
- 使用
add jar /path/work.jar -f;
命令更新Jar包。 - 使用Jar命令运行作业,查看运行日志和结果数据。
性能调优
setSplitSize(long)
:输入表切分大小,单位MB,大于0,默认64。setNumWorkers(int)
:设置作业Worker数量,范围为[1,1000],默认值为1。Worker数由作业输入字节数和splitSize
决定。setWorkerCPU(int)
:Map CPU资源,100为1CPU核,[50,800]之间,默认值为200。setWorkerMemory(int)
:Map内存资源,单位MB,范围为[256,12288],默认值为4096。setMaxIteration(int)
:设置最大迭代次数,默认为-1,小于或等于0时表示最大迭代次数不作为作业终止条件。setJobPriority(int)
:设置作业优先级,范围为[0,9],默认值为9,数值越大优先级越小。
- 可以考虑使用
setNumWorkers
方法增加Worker数目。 - 可以考虑使用
setSplitSize
方法减少切分大小,提高作业载入数据速度。 - 加大Worker的CPU或内存。
- 设置最大迭代次数,如果有些应用结果精度要求不高,可以考虑减少迭代次数,尽快结束。
setNumWorkers
与setSplitSize
配合使用,可以提高数据的载入速度。假设setNumWorkers
为workerNum
,setSplitSize
为splitSize
,总输入字节数为inputSize
,则输入被切分后的块数splitNum=inputSize/splitSize
。workerNum
和splitNum
之间的关系如下:
- 情况一:
splitNum
=workerNum
,每个Worker负责载入1个Split。 - 情况二:
splitNum
>workerNum
,每个Worker负责载入1个或多个Split。 - 情况三:
splitNum
<workerNum
,每个Worker负责载入0个或1一个Split。
因此,应调节workerNum
和splitSize
,在满足前两种情况时,数据载入比较快。迭代阶段只需调节workerNum
。如果设置runtime partitioning
为False,则建议直接使用setSplitSize
控制Worker数量,或者保证满足前两种情况。当出现第三种情况时,部分Worker上的切片会为0,可以在Jar命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>;
与setNumWorkers
,和setSplitSize
等效。
- 尝试Combiner,将这些Key对应点的消息进行本地聚合,减少消息发生。
开发人员可定义Combiner以减少存储消息的内存和网络数据流量,缩短作业的执行时间。
- 改进业务逻辑。
数据量大时,读取磁盘中的数据可能耗费一部分处理时间。减少需要读取的数据字节数可以提高总体的吞吐量,提高作业性能。您可通过以下方法进行改进:
- 减少输入数据量:对某些决策性质的应用,处理数据采样后子集所得到的结果只可能影响结果的精度,而并不会影响整体的准确性,因此可以考虑先对数据进行特定采样后再导入输入表中进行处理。
- 避免读取用不到的字段:MaxCompute Graph框架的TableInfo类支持读取指定的列(以列名数组方式传入),而非整个表或表分区,这样也可以减少输入的数据量,提高作业性能。
内置Jar包
-libjars
带上这些Jar包。
- commons-codec-1.3.jar
- commons-io-2.0.1.jar
- commons-lang-2.5.jar
- commons-logging-1.0.4.jar
- commons-logging-api-1.0.4.jar
- guava-14.0.jar
- json.jar
- log4j-1.2.15.jar
- slf4j-api-1.4.3.jar
- slf4j-log4j12-1.4.3.jar
- xmlenc-0.52.jar