全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

拓扑排序

更新时间:2017-10-22 14:27:24

对于有向边(u,v),定义所有满足 u<v 的顶点序列为拓扑序列,拓扑排序就是求一个有向图的拓扑序列的算法。

算法步骤如下:

  1. 从图中找到一个没有入边的顶点,并输出。

  2. 从图中删除该点,及其所有出边。

  3. 重复以上步骤,直到所有点都已输出。

代码示例

拓扑排序算法的代码,如下所示:

  1. import java.io.IOException;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import com.aliyun.odps.data.TableInfo;
  5. import com.aliyun.odps.graph.Aggregator;
  6. import com.aliyun.odps.graph.Combiner;
  7. import com.aliyun.odps.graph.ComputeContext;
  8. import com.aliyun.odps.graph.GraphJob;
  9. import com.aliyun.odps.graph.GraphLoader;
  10. import com.aliyun.odps.graph.MutationContext;
  11. import com.aliyun.odps.graph.Vertex;
  12. import com.aliyun.odps.graph.WorkerContext;
  13. import com.aliyun.odps.io.LongWritable;
  14. import com.aliyun.odps.io.NullWritable;
  15. import com.aliyun.odps.io.BooleanWritable;
  16. import com.aliyun.odps.io.WritableRecord;
  17. public class TopologySort {
  18. private final static Log LOG = LogFactory.getLog(TopologySort.class);
  19. public static class TopologySortVertex extends
  20. Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  21. @Override
  22. public void compute(
  23. ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  24. Iterable<LongWritable> messages) throws IOException {
  25. // in superstep 0, each vertex sends message whose value is 1 to its
  26. // neighbors
  27. if (context.getSuperstep() == 0) {
  28. if (hasEdges()) {
  29. context.sendMessageToNeighbors(this, new LongWritable(1L));
  30. }
  31. } else if (context.getSuperstep() >= 1) {
  32. // compute each vertex's indegree
  33. long indegree = getValue().get();
  34. for (LongWritable msg : messages) {
  35. indegree += msg.get();
  36. }
  37. setValue(new LongWritable(indegree));
  38. if (indegree == 0) {
  39. voteToHalt();
  40. if (hasEdges()) {
  41. context.sendMessageToNeighbors(this, new LongWritable(-1L));
  42. }
  43. context.write(new LongWritable(context.getSuperstep()), getId());
  44. LOG.info("vertex: " + getId());
  45. }
  46. context.aggregate(new LongWritable(indegree));
  47. }
  48. }
  49. }
  50. public static class TopologySortVertexReader extends
  51. GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  52. @Override
  53. public void load(
  54. LongWritable recordNum,
  55. WritableRecord record,
  56. MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  57. throws IOException {
  58. TopologySortVertex vertex = new TopologySortVertex();
  59. vertex.setId((LongWritable) record.get(0));
  60. vertex.setValue(new LongWritable(0));
  61. String[] edges = record.get(1).toString().split(",");
  62. for (int i = 0; i < edges.length; i++) {
  63. long edge = Long.parseLong(edges[i]);
  64. if (edge >= 0) {
  65. vertex.addEdge(new LongWritable(Long.parseLong(edges[i])),
  66. NullWritable.get());
  67. }
  68. }
  69. LOG.info(record.toString());
  70. context.addVertexRequest(vertex);
  71. }
  72. }
  73. public static class LongSumCombiner extends
  74. Combiner<LongWritable, LongWritable> {
  75. @Override
  76. public void combine(LongWritable vertexId, LongWritable combinedMessage,
  77. LongWritable messageToCombine) throws IOException {
  78. combinedMessage.set(combinedMessage.get() + messageToCombine.get());
  79. }
  80. }
  81. public static class TopologySortAggregator extends
  82. Aggregator<BooleanWritable> {
  83. @SuppressWarnings("rawtypes")
  84. @Override
  85. public BooleanWritable createInitialValue(WorkerContext context)
  86. throws IOException {
  87. return new BooleanWritable(true);
  88. }
  89. @Override
  90. public void aggregate(BooleanWritable value, Object item)
  91. throws IOException {
  92. boolean hasCycle = value.get();
  93. boolean inDegreeNotZero = ((LongWritable) item).get() == 0 ? false : true;
  94. value.set(hasCycle && inDegreeNotZero);
  95. }
  96. @Override
  97. public void merge(BooleanWritable value, BooleanWritable partial)
  98. throws IOException {
  99. value.set(value.get() && partial.get());
  100. }
  101. @SuppressWarnings("rawtypes")
  102. @Override
  103. public boolean terminate(WorkerContext context, BooleanWritable value)
  104. throws IOException {
  105. if (context.getSuperstep() == 0) {
  106. // since the initial aggregator value is true, and in superstep we don't
  107. // do aggregate
  108. return false;
  109. }
  110. return value.get();
  111. }
  112. }
  113. public static void main(String[] args) throws IOException {
  114. if (args.length != 2) {
  115. System.out.println("Usage : <inputTable> <outputTable>");
  116. System.exit(-1);
  117. }
  118. // 输入表形式为
  119. // 0 1,2
  120. // 1 3
  121. // 2 3
  122. // 3 -1
  123. // 第一列为vertexid,第二列为该点边的destination vertexid,若值为-1,表示该点无出边
  124. // 输出表形式为
  125. // 0 0
  126. // 1 1
  127. // 1 2
  128. // 2 3
  129. // 第一列为supstep值,隐含了拓扑顺序,第二列为vertexid
  130. // TopologySortAggregator用来判断图中是否有环
  131. // 若输入的图有环,则当图中active的点入度都不为0时,迭代结束
  132. // 用户可以通过输入表和输出表的记录数来判断一个有向图是否有环
  133. GraphJob job = new GraphJob();
  134. job.setGraphLoaderClass(TopologySortVertexReader.class);
  135. job.setVertexClass(TopologySortVertex.class);
  136. job.addInput(TableInfo.builder().tableName(args[0]).build());
  137. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  138. job.setCombinerClass(LongSumCombiner.class);
  139. job.setAggregatorClass(TopologySortAggregator.class);
  140. long startTime = System.currentTimeMillis();
  141. job.run();
  142. System.out.println("Job Finished in "
  143. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  144. }
  145. }
本文导读目录