Dijkstra算法是求解有向图中单源最短距离(Single Source Shortest Path,简称为SSSP)的经典算法。
算法基本原理,如下所示:
- 初始化:源点s到s自身的距离(d[s]=0),其他点u到s的距离为无穷(d[u]=∞)。
- 迭代:如果存在一条从u到v的边,则从s到v的最短距离更新为
d[v]=min(d[v], d[u]+weight(u, v))
,直到所有的点到s的距离不再发生变化时,迭代结束。说明 最短距离:对一个有权重的有向图G=(V,E)
,从一个源点s到汇点v有很多路径,其中边权和最小的路径,称从s到v的最短距离。
由算法基本原理可以看出,此算法非常适合于用MaxCompute Graph程序进行求解。每个点维护到源点的当前最短距离值,当这个值变化时,将新值加上边的权值,发送消息通知其邻接点。下一轮迭代时,邻接点根据收到的消息,更新其当前最短距离,当所有点当前最短距离不再变化时,迭代结束。
代码示例
单源最短距离的代码,如下所示。
import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
this.setValue(new LongWritable(Long.MAX_VALUE));
}
public boolean isStartVertex(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
if (startVertexId == -1) {
String s = context.getConfiguration().get(START_VERTEX);
startVertexId = Long.parseLong(s);
}
return getId().get() == startVertexId;
}
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
for (LongWritable msg : messages) {
if (msg.get() < minDist) {
minDist = msg.get();
}
}
if (minDist < this.getValue().get()) {
this.setValue(new LongWritable(minDist));
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
+ e.getValue().get()));
}
}
} else {
voteToHalt();
}
}
@Override
public void cleanup(
WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class MinLongCombiner extends
Combiner<LongWritable, LongWritable> {
@Override
public void combine(LongWritable vertexId, LongWritable combinedMessage,
LongWritable messageToCombine) throws IOException {
if (combinedMessage.get() > messageToCombine.get()) {
combinedMessage.set(messageToCombine.get());
}
}
}
public static class SSSPVertexReader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
SSSPVertex vertex = new SSSPVertex();
vertex.setId((LongWritable) record.get(0));
String[] edges = record.get(1).toString().split(",");
for (int i = 0; i < edges.length; i++) {
String[] ss = edges[i].split(":");
vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
new LongWritable(Long.parseLong(ss[1])));
}
context.addVertexRequest(vertex);
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: <startnode> <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPVertexReader.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
代码说明:
- 第19行:定义
SSSPVertex
,其中:- 点值表示该点到源点
startVertexId
的当前最短距离。 compute()
方法使用迭代公式d[v]=min(d[v], d[u]+weight(u, v))
更新点值。cleanup()
方法把点及其到源点的最短距离写到结果表中。
- 点值表示该点到源点
- 第58行:当点值没发生变化时,调用
voteToHalt()
使框架该点进入halt状态,当所有点都进入halt状态时,计算结束。 - 第70行:定义
MinLongCombiner
,对发送给同一个点的消息进行合并,优化性能,减少内存占用。 - 第83行:定义
SSSPVertexReader
类,加载图,将表中每一条记录解析为一个点,记录的第一列是点标识,第二列存储该点起始的所有的边集,内容如2:2、3:1、4:4。 - 第106行:主程序(
main
函数),定义GraphJob
,指定Vertex
、GraphLoader
、Combiner
等的实现,指定输入输出表。
在文档使用中是否遇到以下问题
更多建议
匿名提交