全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

三角形计数

更新时间:2017-10-22 14:27:56

三角形计数算法用于计算通过每个顶点的三角形个数。

算法实现的流程如下:

  1. 每个顶点将其 ID 发送给所有出边邻居。

  2. 存储入边和出边邻居并发送给出边邻居。

  3. 对每条边计算其终点的交集数量,并求和,结果输出到表。

  4. 将表中的输出结果求和并除以三,即得到三角形个数。

代码示例

三角形计数算法的代码,如下所示:

  1. import java.io.IOException;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.ComputeContext;
  4. import com.aliyun.odps.graph.Edge;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.MutationContext;
  8. import com.aliyun.odps.graph.Vertex;
  9. import com.aliyun.odps.graph.WorkerContext;
  10. import com.aliyun.odps.io.LongWritable;
  11. import com.aliyun.odps.io.NullWritable;
  12. import com.aliyun.odps.io.Tuple;
  13. import com.aliyun.odps.io.Writable;
  14. import com.aliyun.odps.io.WritableRecord;
  15. /**
  16. * Compute the number of triangles passing through each vertex.
  17. *
  18. * The algorithm can be computed in three supersteps:
  19. * I. Each vertex sends a message with its ID to all its outgoing
  20. * neighbors.
  21. * II. The incoming neighbors and outgoing neighbors are stored and
  22. * send to outgoing neighbors.
  23. * III. For each edge compute the intersection of the sets at destination
  24. * vertex and sum them, then output to table.
  25. *
  26. * The triangle count is the sum of output table and divide by three since
  27. * each triangle is counted three times.
  28. *
  29. **/
  30. public class TriangleCount {
  31. public static class TCVertex extends
  32. Vertex<LongWritable, Tuple, NullWritable, Tuple> {
  33. @Override
  34. public void setup(
  35. WorkerContext<LongWritable, Tuple, NullWritable, Tuple> context)
  36. throws IOException {
  37. // collect the outgoing neighbors
  38. Tuple t = new Tuple();
  39. if (this.hasEdges()) {
  40. for (Edge<LongWritable, NullWritable> edge : this.getEdges()) {
  41. t.append(edge.getDestVertexId());
  42. }
  43. }
  44. this.setValue(t);
  45. }
  46. @Override
  47. public void compute(
  48. ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context,
  49. Iterable<Tuple> msgs) throws IOException {
  50. if (context.getSuperstep() == 0L) {
  51. // sends a message with its ID to all its outgoing neighbors
  52. Tuple t = new Tuple();
  53. t.append(getId());
  54. context.sendMessageToNeighbors(this, t);
  55. } else if (context.getSuperstep() == 1L) {
  56. // store the incoming neighbors
  57. for (Tuple msg : msgs) {
  58. for (Writable item : msg.getAll()) {
  59. if (!this.getValue().getAll().contains((LongWritable)item)) {
  60. this.getValue().append((LongWritable)item);
  61. }
  62. }
  63. }
  64. // send both incoming and outgoing neighbors to all outgoing neighbors
  65. context.sendMessageToNeighbors(this, getValue());
  66. } else if (context.getSuperstep() == 2L) {
  67. // count the sum of intersection at each edge
  68. long count = 0;
  69. for (Tuple msg : msgs) {
  70. for (Writable id : msg.getAll()) {
  71. if (getValue().getAll().contains(id)) {
  72. count ++;
  73. }
  74. }
  75. }
  76. // output to table
  77. context.write(getId(), new LongWritable(count));
  78. this.voteToHalt();
  79. }
  80. }
  81. }
  82. public static class TCVertexReader extends
  83. GraphLoader<LongWritable, Tuple, NullWritable, Tuple> {
  84. @Override
  85. public void load(
  86. LongWritable recordNum,
  87. WritableRecord record,
  88. MutationContext<LongWritable, Tuple, NullWritable, Tuple> context)
  89. throws IOException {
  90. TCVertex vertex = new TCVertex();
  91. vertex.setId((LongWritable) record.get(0));
  92. String[] edges = record.get(1).toString().split(",");
  93. for (int i = 0; i < edges.length; i++) {
  94. try {
  95. long destID = Long.parseLong(edges[i]);
  96. vertex.addEdge(new LongWritable(destID), NullWritable.get());
  97. } catch(NumberFormatException nfe) {
  98. System.err.println("Ignore " + nfe);
  99. }
  100. }
  101. context.addVertexRequest(vertex);
  102. }
  103. }
  104. public static void main(String[] args) throws IOException {
  105. if (args.length < 2) {
  106. System.out.println("Usage: <input> <output>");
  107. System.exit(-1);
  108. }
  109. GraphJob job = new GraphJob();
  110. job.setGraphLoaderClass(TCVertexReader.class);
  111. job.setVertexClass(TCVertex.class);
  112. job.addInput(TableInfo.builder().tableName(args[0]).build());
  113. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  114. long startTime = System.currentTimeMillis();
  115. job.run();
  116. System.out.println("Job Finished in "
  117. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  118. }
  119. }
本文导读目录