全部产品
MaxCompute

概要

更新时间:2017-06-07 13:26:11   分享:   

MaxCompute Graph是一套面向迭代的图计算处理框架。图计算作业使用图进行建模,图由点(Vertex)和边(Edge)组成,点和边包含权值(Value),MaxCompute Graph支持下述图编辑操作:

  • 修改点或边的权值;
  • 增加/删除点;
  • 增加/删除边;

备注:

  • 编辑点和边时,点与边的关系需要用户维护。

通过迭代对图进行编辑、演化,最终求解出结果,典型应用:PageRank单源最短距离算法K-均值聚类算法 等等。用户可以使用 MaxCompute GRAPH 提供的接口Java SDK编写图计算程序。

Graph数据结构

MaxCompute GRAPH能够处理的图必须是是一个由点(Vertex)和边(Edge)组成的有向图。由于MaxCompute仅提供二维表的存储结构,因此需要用户自行将图数据分解为二维表格式存储在MaxCompute中,在进行图计算分析时,使用自定义的GraphLoader将二维表数据转换为MaxCompute Graph引擎中的点和边。至于如何将图数据分解为二维表格式,用户可以根据各自的业务场景做决定。在 示例程序 中,我们给出的示例分别使用不同的表格式来表达图的数据结构,仅供大家参考。

点的结构可以简单表示为 < ID, Value, Halted, Edges >,分别表示点标识符(ID),权值(Value),状态(Halted, 表示是否要停止迭代),出边集合(Edges,以该点为起始点的所有边列表)。边的结构可以简单表示为<DestVertexID, Value >,分别表示目标点(DestVertexID)和权值(Value)。

例如,上图由下面的点组成:

Vertex <ID, Value, Halted, Edges>
v0 <0, 0, false, [ <1, 5 >, <2, 10 > ] >
v1 <1, 5, false, [ <2, 3>, <3, 2>, <5, 9>]>
v2 <2, 8, false, [<1, 2>, <5, 1 >]>
v3 <3, Long.MAX_VALUE, false, [<0, 7>, <5, 6>]>
v5 <5, Long.MAX_VALUE, false, [<3, 4 > ]>

Graph 程序逻辑

1. 加载图:

图加载:框架调用用户自定义的GraphLoader将输入表的记录解析为点或边;分布式化:框架调用用户自定义的Partitioner对点进行分片(默认分片逻辑:点ID哈希值然后对Worker数取模),分配到相应的Worker;

例如,上图假设Worker数是2,那么v0, v2会被分配到Worker0,因为ID对2取模结果为0,而v1, v3, v5将被分配到Worker1,ID对2取模结果为1;

2. 迭代计算:

  • 一次迭代为一个”超步”(SuperStep),遍历所有非结束状态(Halted值为false)的点或者收到消息的点(处于结束状态的点收到信息会被自动唤醒),并调用其compute(ComputeContext context, Iterable messages)方法;
  • 在用户实现的compute(ComputeContext context, Iterable messages)方法中:
    • 处理上一个超步发给当前点的消息(Messages);
    • 根据需要对图进行编辑:1). 修改点/边的取值;2). 发送消息给某些点;3). 增加/删除点或边;
    • 通过Aggregator汇总信息到全局信息;
    • 设置当前点状态,结束或非结束状态;
    • 迭代进行过程中,框架会将消息以异步的方式发送到对应Worker并在下一个超步进行处理,用户无需关心;

3. 迭代终止(满足以下任意一条):

  • 所有点处于结束状态(Halted值为true)且没有新消息产生;
  • 达到最大迭代次数;
  • 某个Aggregator的terminate方法返回true;

伪代码描述如下:

  1. // 1. load
  2. for each record in input_table {
  3. GraphLoader.load();
  4. }
  5. // 2. setup
  6. WorkerComputer.setup();
  7. for each aggr in aggregators {
  8. aggr.createStartupValue();
  9. }
  10. for each v in vertices {
  11. v.setup();
  12. }
  13. // 3. superstep
  14. for (step = 0; step < max; step ++) {
  15. for each aggr in aggregators {
  16. aggr.createInitialValue();
  17. }
  18. for each v in vertices {
  19. v.compute();
  20. }
  21. }
  22. // 4. cleanup
  23. for each v in vertices {
  24. v.cleanup();
  25. }
  26. WorkerComputer.cleanup();
本文导读目录
本文导读目录
以上内容是否对您有帮助?