全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

强连通分量

更新时间:2017-10-22 14:27:23

在有向图中,如果从任意一个顶点出发,都能通过图中的边到达图中的每一个顶点,则称之为强连通图。一张有向图的顶点数极大的强连通子图称为强连通分量。此算法示例基于 parallel Coloring algorithm

每个顶点包含两个部分,如下所示:

  • colorID:在向前遍历过程中存储顶点 v 的颜色,在计算结束时,具有相同 colorID 的顶点属于一个强连通分量。

  • transposeNeighbors:存储输入图的转置图中顶点 v 的邻居 ID。

算法包含以下四部分:

  • 生成转置图:包含两个超步,首先每个顶点发送 ID 到其出边对应的邻居,这些 ID 在第二个超步中会存为 transposeNeighbors 值。

  • 修剪:一个超步,每个只有一个入边或出边的顶点,将其 colorID 设为自身 ID,状态设为不活跃,后面传给该顶点的信号被忽略。

  • 向前遍历:顶点包括两个子过程(超步),启动和休眠。在启动阶段,每个顶点将其 colorID 设置为自身 ID,同时将其 ID 传给出边对应的邻居。休眠阶段,顶点使用其收到的最大 colorID 更新自身 colorID,并传播其 colorID,直到 colorID 收敛。当 colorID 收敛,master 进程将全局对象设置为向后遍历。

  • 向后遍历:同样包含两个子过程,启动和休眠。启动阶段,每一个 ID 等于 colorID 的顶点将其 ID 传递给其转置图邻居顶点,同时将自身状态设置为不活跃,后面传给该顶点的信号可忽略。在每一个休眠步,每个顶点接收到与其 colorID 匹配的信号,并将其 colorID 在转置图中传播,随后设置自身状态为不活跃。该步结束后如果仍有活跃顶点,则回到修剪步。

代码示例

强连通分量的代码,如下所示:

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import com.aliyun.odps.data.TableInfo;
  5. import com.aliyun.odps.graph.Aggregator;
  6. import com.aliyun.odps.graph.ComputeContext;
  7. import com.aliyun.odps.graph.GraphJob;
  8. import com.aliyun.odps.graph.GraphLoader;
  9. import com.aliyun.odps.graph.MutationContext;
  10. import com.aliyun.odps.graph.Vertex;
  11. import com.aliyun.odps.graph.WorkerContext;
  12. import com.aliyun.odps.io.BooleanWritable;
  13. import com.aliyun.odps.io.IntWritable;
  14. import com.aliyun.odps.io.LongWritable;
  15. import com.aliyun.odps.io.NullWritable;
  16. import com.aliyun.odps.io.Tuple;
  17. import com.aliyun.odps.io.Writable;
  18. import com.aliyun.odps.io.WritableRecord;
  19. /**
  20. * Definition from Wikipedia:
  21. * In the mathematical theory of directed graphs, a graph is said
  22. * to be strongly connected if every vertex is reachable from every
  23. * other vertex. The strongly connected components of an arbitrary
  24. * directed graph form a partition into subgraphs that are themselves
  25. * strongly connected.
  26. *
  27. * Algorithms with four phases as follows.
  28. * 1. Transpose Graph Formation: Requires two supersteps. In the first
  29. * superstep, each vertex sends a message with its ID to all its outgoing
  30. * neighbors, which in the second superstep are stored in transposeNeighbors.
  31. *
  32. * 2. Trimming: Takes one superstep. Every vertex with only in-coming or
  33. * only outgoing edges (or neither) sets its colorID to its own ID and
  34. * becomes inactive. Messages subsequently sent to the vertex are ignored.
  35. *
  36. * 3. Forward-Traversal: There are two sub phases: Start and Rest. In the
  37. * Start phase, each vertex sets its colorID to its own ID and propagates
  38. * its ID to its outgoing neighbors. In the Rest phase, vertices update
  39. * their own colorIDs with the minimum colorID they have seen, and propagate
  40. * their colorIDs, if updated, until the colorIDs converge.
  41. * Set the phase to Backward-Traversal when the colorIDs converge.
  42. *
  43. * 4. Backward-Traversal: We again break the phase into Start and Rest.
  44. * In Start, every vertex whose ID equals its colorID propagates its ID to
  45. * the vertices in transposeNeighbors and sets itself inactive. Messages
  46. * subsequently sent to the vertex are ignored. In each of the Rest phase supersteps,
  47. * each vertex receiving a message that matches its colorID: (1) propagates
  48. * its colorID in the transpose graph; (2) sets itself inactive. Messages
  49. * subsequently sent to the vertex are ignored. Set the phase back to Trimming
  50. * if not all vertex are inactive.
  51. *
  52. * http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf
  53. */
  54. public class StronglyConnectedComponents {
  55. public final static int STAGE_TRANSPOSE_1 = 0;
  56. public final static int STAGE_TRANSPOSE_2 = 1;
  57. public final static int STAGE_TRIMMING = 2;
  58. public final static int STAGE_FW_START = 3;
  59. public final static int STAGE_FW_REST = 4;
  60. public final static int STAGE_BW_START = 5;
  61. public final static int STAGE_BW_REST = 6;
  62. /**
  63. * The value is composed of component id, incoming neighbors,
  64. * active status and updated status.
  65. */
  66. public static class MyValue implements Writable {
  67. LongWritable sccID;// strongly connected component id
  68. Tuple inNeighbors; // transpose neighbors
  69. BooleanWritable active; // vertex is active or not
  70. BooleanWritable updated; // sccID is updated or not
  71. public MyValue() {
  72. this.sccID = new LongWritable(Long.MAX_VALUE);
  73. this.inNeighbors = new Tuple();
  74. this.active = new BooleanWritable(true);
  75. this.updated = new BooleanWritable(false);
  76. }
  77. public void setSccID(LongWritable sccID) {
  78. this.sccID = sccID;
  79. }
  80. public LongWritable getSccID() {
  81. return this.sccID;
  82. }
  83. public void setInNeighbors(Tuple inNeighbors) {
  84. this.inNeighbors = inNeighbors;
  85. }
  86. public Tuple getInNeighbors() {
  87. return this.inNeighbors;
  88. }
  89. public void addInNeighbor(LongWritable neighbor) {
  90. this.inNeighbors.append(new LongWritable(neighbor.get()));
  91. }
  92. public boolean isActive() {
  93. return this.active.get();
  94. }
  95. public void setActive(boolean status) {
  96. this.active.set(status);
  97. }
  98. public boolean isUpdated() {
  99. return this.updated.get();
  100. }
  101. public void setUpdated(boolean update) {
  102. this.updated.set(update);
  103. }
  104. @Override
  105. public void write(DataOutput out) throws IOException {
  106. this.sccID.write(out);
  107. this.inNeighbors.write(out);
  108. this.active.write(out);
  109. this.updated.write(out);
  110. }
  111. @Override
  112. public void readFields(DataInput in) throws IOException {
  113. this.sccID.readFields(in);
  114. this.inNeighbors.readFields(in);
  115. this.active.readFields(in);
  116. this.updated.readFields(in);
  117. }
  118. @Override
  119. public String toString() {
  120. StringBuilder sb = new StringBuilder();
  121. sb.append("sccID: " + sccID.get());
  122. sb.append(" inNeighbores: " + inNeighbors.toDelimitedString(','));
  123. sb.append(" active: " + active.get());
  124. sb.append(" updated: " + updated.get());
  125. return sb.toString();
  126. }
  127. }
  128. public static class SCCVertex extends
  129. Vertex<LongWritable, MyValue, NullWritable, LongWritable> {
  130. public SCCVertex() {
  131. this.setValue(new MyValue());
  132. }
  133. @Override
  134. public void compute(
  135. ComputeContext<LongWritable, MyValue, NullWritable, LongWritable> context,
  136. Iterable<LongWritable> msgs) throws IOException {
  137. // Messages sent to inactive vertex are ignored.
  138. if (!this.getValue().isActive()) {
  139. this.voteToHalt();
  140. return;
  141. }
  142. int stage = ((SCCAggrValue)context.getLastAggregatedValue(0)).getStage();
  143. switch (stage) {
  144. case STAGE_TRANSPOSE_1:
  145. context.sendMessageToNeighbors(this, this.getId());
  146. break;
  147. case STAGE_TRANSPOSE_2:
  148. for (LongWritable msg: msgs) {
  149. this.getValue().addInNeighbor(msg);
  150. }
  151. case STAGE_TRIMMING:
  152. this.getValue().setSccID(getId());
  153. if (this.getValue().getInNeighbors().size() == 0 ||
  154. this.getNumEdges() == 0) {
  155. this.getValue().setActive(false);
  156. }
  157. break;
  158. case STAGE_FW_START:
  159. this.getValue().setSccID(getId());
  160. context.sendMessageToNeighbors(this, this.getValue().getSccID());
  161. break;
  162. case STAGE_FW_REST:
  163. long minSccID = Long.MAX_VALUE;
  164. for (LongWritable msg : msgs) {
  165. if (msg.get() < minSccID) {
  166. minSccID = msg.get();
  167. }
  168. }
  169. if (minSccID < this.getValue().getSccID().get()) {
  170. this.getValue().setSccID(new LongWritable(minSccID));
  171. context.sendMessageToNeighbors(this, this.getValue().getSccID());
  172. this.getValue().setUpdated(true);
  173. } else {
  174. this.getValue().setUpdated(false);
  175. }
  176. break;
  177. case STAGE_BW_START:
  178. if (this.getId().equals(this.getValue().getSccID())) {
  179. for (Writable neighbor : this.getValue().getInNeighbors().getAll()) {
  180. context.sendMessage((LongWritable)neighbor, this.getValue().getSccID());
  181. }
  182. this.getValue().setActive(false);
  183. }
  184. break;
  185. case STAGE_BW_REST:
  186. this.getValue().setUpdated(false);
  187. for (LongWritable msg : msgs) {
  188. if (msg.equals(this.getValue().getSccID())) {
  189. for (Writable neighbor : this.getValue().getInNeighbors().getAll()) {
  190. context.sendMessage((LongWritable)neighbor, this.getValue().getSccID());
  191. }
  192. this.getValue().setActive(false);
  193. this.getValue().setUpdated(true);
  194. break;
  195. }
  196. }
  197. break;
  198. }
  199. context.aggregate(0, getValue());
  200. }
  201. @Override
  202. public void cleanup(
  203. WorkerContext<LongWritable, MyValue, NullWritable, LongWritable> context)
  204. throws IOException {
  205. context.write(getId(), getValue().getSccID());
  206. }
  207. }
  208. /**
  209. * The SCCAggrValue maintains global stage and graph updated and active status.
  210. * updated is true only if one vertex is updated.
  211. * active is true only if one vertex is active.
  212. */
  213. public static class SCCAggrValue implements Writable {
  214. IntWritable stage = new IntWritable(STAGE_TRANSPOSE_1);
  215. BooleanWritable updated = new BooleanWritable(false);
  216. BooleanWritable active = new BooleanWritable(false);
  217. public void setStage(int stage) {
  218. this.stage.set(stage);
  219. }
  220. public int getStage() {
  221. return this.stage.get();
  222. }
  223. public void setUpdated(boolean updated) {
  224. this.updated.set(updated);
  225. }
  226. public boolean getUpdated() {
  227. return this.updated.get();
  228. }
  229. public void setActive(boolean active) {
  230. this.active.set(active);
  231. }
  232. public boolean getActive() {
  233. return this.active.get();
  234. }
  235. @Override
  236. public void write(DataOutput out) throws IOException {
  237. this.stage.write(out);
  238. this.updated.write(out);
  239. this.active.write(out);
  240. }
  241. @Override
  242. public void readFields(DataInput in) throws IOException {
  243. this.stage.readFields(in);
  244. this.updated.readFields(in);
  245. this.active.readFields(in);
  246. }
  247. }
  248. /**
  249. * The job of SCCAggregator is to schedule global stage in every superstep.
  250. */
  251. public static class SCCAggregator extends Aggregator<SCCAggrValue> {
  252. @SuppressWarnings("rawtypes")
  253. @Override
  254. public SCCAggrValue createStartupValue(WorkerContext context) throws IOException {
  255. return new SCCAggrValue();
  256. }
  257. @SuppressWarnings("rawtypes")
  258. @Override
  259. public SCCAggrValue createInitialValue(WorkerContext context)
  260. throws IOException {
  261. return (SCCAggrValue) context.getLastAggregatedValue(0);
  262. }
  263. @Override
  264. public void aggregate(SCCAggrValue value, Object item) throws IOException {
  265. MyValue v = (MyValue)item;
  266. if ((value.getStage() == STAGE_FW_REST || value.getStage() == STAGE_BW_REST)
  267. && v.isUpdated()) {
  268. value.setUpdated(true);
  269. }
  270. // only active vertex invoke aggregate()
  271. value.setActive(true);
  272. }
  273. @Override
  274. public void merge(SCCAggrValue value, SCCAggrValue partial)
  275. throws IOException {
  276. boolean updated = value.getUpdated() || partial.getUpdated();
  277. value.setUpdated(updated);
  278. boolean active = value.getActive() || partial.getActive();
  279. value.setActive(active);
  280. }
  281. @SuppressWarnings("rawtypes")
  282. @Override
  283. public boolean terminate(WorkerContext context, SCCAggrValue value)
  284. throws IOException {
  285. // If all vertices is inactive, job is over.
  286. if (!value.getActive()) {
  287. return true;
  288. }
  289. // state machine
  290. switch (value.getStage()) {
  291. case STAGE_TRANSPOSE_1:
  292. value.setStage(STAGE_TRANSPOSE_2);
  293. break;
  294. case STAGE_TRANSPOSE_2:
  295. value.setStage(STAGE_TRIMMING);
  296. break;
  297. case STAGE_TRIMMING:
  298. value.setStage(STAGE_FW_START);
  299. break;
  300. case STAGE_FW_START:
  301. value.setStage(STAGE_FW_REST);
  302. break;
  303. case STAGE_FW_REST:
  304. if (value.getUpdated()) {
  305. value.setStage(STAGE_FW_REST);
  306. } else {
  307. value.setStage(STAGE_BW_START);
  308. }
  309. break;
  310. case STAGE_BW_START:
  311. value.setStage(STAGE_BW_REST);
  312. break;
  313. case STAGE_BW_REST:
  314. if (value.getUpdated()) {
  315. value.setStage(STAGE_BW_REST);
  316. } else {
  317. value.setStage(STAGE_TRIMMING);
  318. }
  319. break;
  320. }
  321. value.setActive(false);
  322. value.setUpdated(false);
  323. return false;
  324. }
  325. }
  326. public static class SCCVertexReader extends
  327. GraphLoader<LongWritable, MyValue, NullWritable, LongWritable> {
  328. @Override
  329. public void load(
  330. LongWritable recordNum,
  331. WritableRecord record,
  332. MutationContext<LongWritable, MyValue, NullWritable, LongWritable> context)
  333. throws IOException {
  334. SCCVertex vertex = new SCCVertex();
  335. vertex.setId((LongWritable) record.get(0));
  336. String[] edges = record.get(1).toString().split(",");
  337. for (int i = 0; i < edges.length; i++) {
  338. try {
  339. long destID = Long.parseLong(edges[i]);
  340. vertex.addEdge(new LongWritable(destID), NullWritable.get());
  341. } catch(NumberFormatException nfe) {
  342. System.err.println("Ignore " + nfe);
  343. }
  344. }
  345. context.addVertexRequest(vertex);
  346. }
  347. }
  348. public static void main(String[] args) throws IOException {
  349. if (args.length < 2) {
  350. System.out.println("Usage: <input> <output>");
  351. System.exit(-1);
  352. }
  353. GraphJob job = new GraphJob();
  354. job.setGraphLoaderClass(SCCVertexReader.class);
  355. job.setVertexClass(SCCVertex.class);
  356. job.setAggregatorClass(SCCAggregator.class);
  357. job.addInput(TableInfo.builder().tableName(args[0]).build());
  358. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  359. long startTime = System.currentTimeMillis();
  360. job.run();
  361. System.out.println("Job Finished in "
  362. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  363. }
  364. }
本文导读目录