全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

连通分量

更新时间:2017-10-22 14:27:24

两个顶点之间存在路径,称两个顶点为连通的。如果无向图 G 中任意两个顶点都是连通的,则称 G 为连通图,否则称为非连通图。其顶点个数极大的连通子图称为连通分量。

本算法计算每个点的连通分量成员,最后输出顶点值中包含最小顶点 ID 的连通分量。将最小顶点 ID 沿着边传播到连通分量的所有顶点。

代码示例

连通分量的代码,如下所示:

  1. import java.io.IOException;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.ComputeContext;
  4. import com.aliyun.odps.graph.GraphJob;
  5. import com.aliyun.odps.graph.GraphLoader;
  6. import com.aliyun.odps.graph.MutationContext;
  7. import com.aliyun.odps.graph.Vertex;
  8. import com.aliyun.odps.graph.WorkerContext;
  9. import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner;
  10. import com.aliyun.odps.io.LongWritable;
  11. import com.aliyun.odps.io.NullWritable;
  12. import com.aliyun.odps.io.WritableRecord;
  13. /**
  14. * Compute the connected component membership of each vertex and output
  15. * each vertex which's value containing the smallest id in the connected
  16. * component containing that vertex.
  17. *
  18. * Algorithm: propagate the smallest vertex id along the edges to all
  19. * vertices of a connected component.
  20. *
  21. */
  22. public class ConnectedComponents {
  23. public static class CCVertex extends
  24. Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  25. @Override
  26. public void compute(
  27. ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  28. Iterable<LongWritable> msgs) throws IOException {
  29. if (context.getSuperstep() == 0L) {
  30. this.setValue(getId());
  31. context.sendMessageToNeighbors(this, getValue());
  32. return;
  33. }
  34. long minID = Long.MAX_VALUE;
  35. for (LongWritable id : msgs) {
  36. if (id.get() < minID) {
  37. minID = id.get();
  38. }
  39. }
  40. if (minID < this.getValue().get()) {
  41. this.setValue(new LongWritable(minID));
  42. context.sendMessageToNeighbors(this, getValue());
  43. } else {
  44. this.voteToHalt();
  45. }
  46. }
  47. /**
  48. * Output Table Description:
  49. * +-----------------+----------------------------------------+
  50. * | Field | Type | Comment |
  51. * +-----------------+----------------------------------------+
  52. * | v | bigint | vertex id |
  53. * | minID | bigint | smallest id in the connected component |
  54. * +-----------------+----------------------------------------+
  55. */
  56. @Override
  57. public void cleanup(
  58. WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  59. throws IOException {
  60. context.write(getId(), getValue());
  61. }
  62. }
  63. /**
  64. * Input Table Description:
  65. * +-----------------+----------------------------------------------------+
  66. * | Field | Type | Comment |
  67. * +-----------------+----------------------------------------------------+
  68. * | v | bigint | vertex id |
  69. * | es | string | comma separated target vertex id of outgoing edges |
  70. * +-----------------+----------------------------------------------------+
  71. *
  72. * Example:
  73. * For graph:
  74. * 1 ----- 2
  75. * | |
  76. * 3 ----- 4
  77. * Input table:
  78. * +-----------+
  79. * | v | es |
  80. * +-----------+
  81. * | 1 | 2,3 |
  82. * | 2 | 1,4 |
  83. * | 3 | 1,4 |
  84. * | 4 | 2,3 |
  85. * +-----------+
  86. */
  87. public static class CCVertexReader extends
  88. GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  89. @Override
  90. public void load(
  91. LongWritable recordNum,
  92. WritableRecord record,
  93. MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  94. throws IOException {
  95. CCVertex vertex = new CCVertex();
  96. vertex.setId((LongWritable) record.get(0));
  97. String[] edges = record.get(1).toString().split(",");
  98. for (int i = 0; i < edges.length; i++) {
  99. long destID = Long.parseLong(edges[i]);
  100. vertex.addEdge(new LongWritable(destID), NullWritable.get());
  101. }
  102. context.addVertexRequest(vertex);
  103. }
  104. }
  105. public static void main(String[] args) throws IOException {
  106. if (args.length < 2) {
  107. System.out.println("Usage: <input> <output>");
  108. System.exit(-1);
  109. }
  110. GraphJob job = new GraphJob();
  111. job.setGraphLoaderClass(CCVertexReader.class);
  112. job.setVertexClass(CCVertex.class);
  113. job.setCombinerClass(MinLongCombiner.class);
  114. job.addInput(TableInfo.builder().tableName(args[0]).build());
  115. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  116. long startTime = System.currentTimeMillis();
  117. job.run();
  118. System.out.println("Job Finished in "
  119. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  120. }
  121. }
本文导读目录