The single source shortest path (SSSP) algorithm finds the shortest path from a specified source vertex to all other reachable vertices in a graph. Dijkstra's algorithm is a classic method for solving the SSSP problem in a directed graph.
How it works
Dijkstra's algorithm uses vertices to update shortest distance values. Each vertex maintains its current shortest distance value from the source vertex. When this value changes, the vertex adds the weight of an edge to the new value and sends a message to notify its adjacent vertices. In the next iteration, the adjacent vertices update their current shortest distance values based on the received messages. The iteration ends when the current shortest distance values of all vertices no longer change.
-
Initialization: The distance from the source vertex
sto itself is 0 (d[s]=0), and the distance from any other vertexutosis infinity (d[u]=∞). -
Iteration: If an edge exists from
utov, the shortest distance fromstovis updated asd[v]=min(d[v], d[u]+weight(u, v)). The process continues until the distances fromsto all other vertices stabilize.
For a weighted directed graph G=(V,E), multiple paths may exist from a source vertex s to a sink vertex v. The path with the minimum sum of edge weights is the shortest path from s to v.
This algorithm is well-suited for implementation as a MaxCompute Graph program.
Use cases
MaxCompute supports both directed and undirected graphs. The SSSP results may differ between graph types because path availability depends on the source data and how the graph is constructed. MaxCompute Graph uses a directed graph as its fundamental data model and bases all computations on it.
Code examples
The following sections provide code examples for different scenarios.
-
Directed graph
-
Define the
BaseLoadingVertexResolverclass. This class is referenced in the mainSSSPclass.import com.aliyun.odps.graph.Edge; import com.aliyun.odps.graph.LoadingVertexResolver; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.VertexChanges; import com.aliyun.odps.io.Writable; import com.aliyun.odps.io.WritableComparable; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @SuppressWarnings("rawtypes") public class BaseLoadingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends LoadingVertexResolver<I, V, E, M> { @Override public Vertex<I, V, E, M> resolve(I vertexId, VertexChanges<I, V, E, M> vertexChanges) throws IOException { Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges); if (vertex != null) { addEdges(vertex, vertexChanges); } else { System.err.println("Ignore all addEdgeRequests for vertex#" + vertexId); } return vertex; } protected Vertex<I, V, E, M> addVertexIfDesired( I vertexId, VertexChanges<I, V, E, M> vertexChanges) { Vertex<I, V, E, M> vertex = null; if (hasVertexAdditions(vertexChanges)) { vertex = vertexChanges.getAddedVertexList().get(0); } return vertex; } protected void addEdges(Vertex<I, V, E, M> vertex, VertexChanges<I, V, E, M> vertexChanges) throws IOException { Set<I> destVertexId = new HashSet<I>(); if (vertex.hasEdges()) { List<Edge<I, E>> edgeList = vertex.getEdges(); for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext(); ) { Edge<I, E> edge = edges.next(); if (destVertexId.contains(edge.getDestVertexId())) { edges.remove(); } else { destVertexId.add(edge.getDestVertexId()); } } } for (Vertex<I, V, E, M> vertex1 : vertexChanges.getAddedVertexList()) { if (vertex1.hasEdges()) { List<Edge<I, E>> edgeList = vertex1.getEdges(); for (Edge<I, E> edge : edgeList) { if (destVertexId.contains(edge.getDestVertexId())) continue; destVertexId.add(edge.getDestVertexId()); vertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } } } protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) { return changes != null && changes.getAddedVertexList() != null && !changes.getAddedVertexList().isEmpty(); } }Code explanation:
-
Line 15: Defines
BaseLoadingVertexResolver. This class handles conflicts that occur when loading data for a directed graph. -
Line 18: The
resolvemethod contains the logic for handling a conflict. For example, if a vertex is added twice through twoaddVertexRequestoperations, a loading conflict occurs. You must resolve this conflict before the computation can proceed.
-
-
Define the
SSSPclass.import java.io.IOException; import com.aliyun.odps.graph.Combiner; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.Edge; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.io.WritableRecord; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.data.TableInfo; public class SSSP { public static final String START_VERTEX = "sssp.start.vertex.id"; public static class SSSPVertex extends Vertex<LongWritable, LongWritable, LongWritable, LongWritable> { private static long startVertexId = -1; public SSSPVertex() { this.setValue(new LongWritable(Long.MAX_VALUE)); } public boolean isStartVertex( ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) { if (startVertexId == -1) { String s = context.getConfiguration().get(START_VERTEX); startVertexId = Long.parseLong(s); } return getId().get() == startVertexId; } @Override public void compute( ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context, Iterable<LongWritable> messages) throws IOException { long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE; for (LongWritable msg : messages) { if (msg.get() < minDist) { minDist = msg.get(); } } if (minDist < this.getValue().get()) { this.setValue(new LongWritable(minDist)); if (hasEdges()) { for (Edge<LongWritable, LongWritable> e : this.getEdges()) { context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get())); } } } else { voteToHalt(); } } @Override public void cleanup( WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context) throws IOException { context.write(getId(), getValue()); } @Override public String toString() { return "Vertex(id=" + this.getId() + ",value=" + this.getValue() + ",#edges=" + this.getEdges() + ")"; } } public static class SSSPGraphLoader extends GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context) throws IOException { SSSPVertex vertex = new SSSPVertex(); vertex.setId((LongWritable) record.get(0)); String[] edges = record.get(1).toString().split(","); for (String edge : edges) { String[] ss = edge.split(":"); vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1]))); } context.addVertexRequest(vertex); } } public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> { @Override public void combine(LongWritable vertexId, LongWritable combinedMessage, LongWritable messageToCombine) throws IOException { if (combinedMessage.get() > messageToCombine.get()) { combinedMessage.set(messageToCombine.get()); } } } public static void main(String[] args) throws IOException { if (args.length < 3) { System.out.println("Usage: <startnode> <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(SSSPGraphLoader.class); job.setVertexClass(SSSPVertex.class); job.setCombinerClass(MinLongCombiner.class); job.setLoadingVertexResolver(BaseLoadingVertexResolver.class); job.set(START_VERTEX, args[0]); job.addInput(TableInfo.builder().tableName(args[1]).build()); job.addOutput(TableInfo.builder().tableName(args[2]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }Code explanation:
-
Line 19: Defines
SSSPVertex. In this class:-
The vertex value represents the shortest distance from this vertex to the source vertex
startVertexId. -
The
compute()method uses the iterative formulad[v]=min(d[v], d[u]+weight(u, v))to calculate the shortest distance and update the current vertex's value. -
The
cleanup()method writes the shortest distance from the current vertex to the source vertex to the output table.
-
-
Line 54: If the Value of the current vertex (the shortest path from the vertex to the source vertex) does not change, call the voteToHalt() method through the framework to transition the vertex to the halt state. When all vertices enter the halt state, the computation ends.
-
Line 71: Defines a
GraphLoaderto load the graph data as a directed graph. It parses table records into graph vertices and edges and loads them into the framework. In this example, theaddVertexRequestmethod loads vertex information into the graph's computation context. -
Line 90: Defines
MinLongCombiner. This combines messages sent to the same vertex to optimize performance and reduce memory consumption. -
Line 101: The
mainfunction defines theGraphJob. It sets the implementations forVertex,GraphLoader,BaseLoadingVertexResolver, andCombiner, and configures the input and output tables. -
Line 110: Sets the
BaseLoadingVertexResolverclass to handle conflicts.
-
-
-
Undirected graph
import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.*; import com.aliyun.odps.io.DoubleWritable; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.WritableRecord; import java.io.IOException; import java.util.HashSet; import java.util.Set; public class SSSPBenchmark4 { public static final String START_VERTEX = "sssp.start.vertex.id"; public static class SSSPVertex extends Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { private static long startVertexId = -1; public SSSPVertex() { this.setValue(new DoubleWritable(Double.MAX_VALUE)); } public boolean isStartVertex( ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) { if (startVertexId == -1) { String s = context.getConfiguration().get(START_VERTEX); startVertexId = Long.parseLong(s); } return getId().get() == startVertexId; } @Override public void compute( ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context, Iterable<DoubleWritable> messages) throws IOException { double minDist = isStartVertex(context) ? 0 : Double.MAX_VALUE; for (DoubleWritable msg : messages) { if (msg.get() < minDist) { minDist = msg.get(); } } if (minDist < this.getValue().get()) { this.setValue(new DoubleWritable(minDist)); if (hasEdges()) { for (Edge<LongWritable, DoubleWritable> e : this.getEdges()) { context.sendMessage(e.getDestVertexId(), new DoubleWritable(minDist + e.getValue().get())); } } } else { voteToHalt(); } } @Override public void cleanup( WorkerContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) throws IOException { context.write(getId(), getValue()); } } public static class MinLongCombiner extends Combiner<LongWritable, DoubleWritable> { @Override public void combine(LongWritable vertexId, DoubleWritable combinedMessage, DoubleWritable messageToCombine) { if (combinedMessage.get() > messageToCombine.get()) { combinedMessage.set(messageToCombine.get()); } } } public static class SSSPGraphLoader extends GraphLoader<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) throws IOException { LongWritable sourceVertexID = (LongWritable) record.get(0); LongWritable destinationVertexID = (LongWritable) record.get(1); DoubleWritable edgeValue = (DoubleWritable) record.get(2); Edge<LongWritable, DoubleWritable> edge = new Edge<LongWritable, DoubleWritable>(destinationVertexID, edgeValue); context.addEdgeRequest(sourceVertexID, edge); Edge<LongWritable, DoubleWritable> edge2 = new Edge<LongWritable, DoubleWritable>(sourceVertexID, edgeValue); context.addEdgeRequest(destinationVertexID, edge2); } } public static class SSSPLoadingVertexResolver extends LoadingVertexResolver<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { @Override public Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> resolve( LongWritable vertexId, VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> vertexChanges) throws IOException { SSSPVertex computeVertex = new SSSPVertex(); computeVertex.setId(vertexId); Set<LongWritable> destinationVertexIDSet = new HashSet<>(); if (hasEdgeAdditions(vertexChanges)) { for (Edge<LongWritable, DoubleWritable> edge : vertexChanges.getAddedEdgeList()) { if (!destinationVertexIDSet.contains(edge.getDestVertexId())) { destinationVertexIDSet.add(edge.getDestVertexId()); computeVertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } } return computeVertex; } protected boolean hasEdgeAdditions(VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> changes) { return changes != null && changes.getAddedEdgeList() != null && !changes.getAddedEdgeList().isEmpty(); } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage: <startnode> <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(SSSPGraphLoader.class); job.setLoadingVertexResolver(SSSPLoadingVertexResolver.class); job.setVertexClass(SSSPVertex.class); job.setCombinerClass(MinLongCombiner.class); job.set(START_VERTEX, args[0]); job.addInput(TableInfo.builder().tableName(args[1]).build()); job.addOutput(TableInfo.builder().tableName(args[2]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }Code explanation:
-
Line 15: Defines
SSSPVertex. In this class:-
The vertex value represents the shortest distance from this vertex to the source vertex
startVertexId. -
The
compute()method uses the iterative formulad[v]=min(d[v], d[u]+weight(u, v))to calculate the shortest distance and update the current vertex's value. -
The
cleanup()method writes the shortest distance from the current vertex to the source vertex to the output table.
-
-
Line 54: If the Value of the current vertex (the shortest path from this vertex to the source vertex) does not change, the vertex enters the halt state by calling voteToHalt() through the framework. The computation ends when all vertices enter the halt state.
-
Line 61: Defines
MinLongCombiner. This combines messages sent to the same vertex to optimize performance and reduce memory consumption. -
Line 72: Defines a
GraphLoaderto load graph data as an undirected graph. It usesaddEdgeRequestto load the edge between two vertices as a bidirectional edge, ensuring the table data is loaded as an undirected graph.-
Line 80: The first column represents the source vertex ID.
-
Line 81: The second column represents the destination vertex ID.
-
Line 82: The third column represents the edge weight.
-
Line 83: Creates an edge consisting of the destination vertex ID and the edge weight.
-
Line 84: Makes a request to add the edge to the source vertex.
-
Lines 85-87: Each
Recordrepresents a bidirectional edge. The logic from lines 83 and 84 is repeated for the reverse direction.
-
-
Defines
SSSPLoadingVertexResolver. This class handles conflicts that occur when loading data for an undirected graph. For example, if the same edge is added twice through twoaddEdgeRequestoperations, a loading conflict occurs. You must handle duplicate edges to ensure correct computation. -
Line 101: The
mainfunction defines theGraphJob. It sets the implementations forVertex,GraphLoader,SSSPLoadingVertexResolver, andCombiner, and configures the input and output tables.
-
Execution results
The following output is from running the directed graph code example. For more information, see Develop Graph programs.
vertex value
1 0
2 2
3 1
4 3
5 2
-
vertex: The current vertex. -
value: The shortest distance from the current vertex to the source vertex (1).
To create data for an undirected graph, use the source vertex ID, destination vertex ID, and edge weight as shown in the preceding code example.
Tutorial
For more information about implementing the preceding code examples, see Develop Graph programs.