全部产品
MaxCompute

PageRank

更新时间:2017-06-07 13:26:11   分享:   

PageRank 算法是计算网页排名的经典算法:输入是一个有向图 G,其中顶点表示网页,如果存在网页 A 到网页 B 的链接,那么存在联接 A 到 B 的边,算法基本原理:

  • 初始化:点值表示 PageRank 的 rank 值(double 类型),初始时,所有点取值为 1/TotalNumVertices
  • 迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum,其中 sum 为所有指向 i 点的点(设为 j) PageRank(j)/out_degree(j) 的累加值

从算法基本原理可以看出,这个算法非常适合使用 OPDS Graph 程序进行求解: 每个点 j 维护其 PageRank 值,每一轮迭代都将PageRank(j)/out_degree(j) 发给其邻接点(向其投票),下一轮迭代时,每个点根据迭代公式重新计算 PageRank 取值。

源代码

  1. import java.io.IOException;
  2. import org.apache.log4j.Logger;
  3. import com.aliyun.odps.io.WritableRecord;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.MutationContext;
  8. import com.aliyun.odps.graph.Vertex;
  9. import com.aliyun.odps.graph.WorkerContext;
  10. import com.aliyun.odps.io.DoubleWritable;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.io.NullWritable;
  13. import com.aliyun.odps.data.TableInfo;
  14. import com.aliyun.odps.io.Text;
  15. import com.aliyun.odps.io.Writable;
  16. public class PageRank {
  17. private final static Logger LOG = Logger.getLogger(PageRank.class);
  18. public static class PageRankVertex extends
  19. Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
  20. @Override
  21. public void compute(
  22. ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
  23. Iterable<DoubleWritable> messages) throws IOException {
  24. if (context.getSuperstep() == 0) {
  25. setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
  26. } else if (context.getSuperstep() >= 1) {
  27. double sum = 0;
  28. for (DoubleWritable msg : messages) {
  29. sum += msg.get();
  30. }
  31. DoubleWritable vertexValue = new DoubleWritable(
  32. (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
  33. setValue(vertexValue);
  34. }
  35. if (hasEdges()) {
  36. context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
  37. .get() / getEdges().size()));
  38. }
  39. }
  40. @Override
  41. public void cleanup(
  42. WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  43. throws IOException {
  44. context.write(getId(), getValue());
  45. }
  46. }
  47. public static class PageRankVertexReader extends
  48. GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
  49. @Override
  50. public void load(
  51. LongWritable recordNum,
  52. WritableRecord record,
  53. MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  54. throws IOException {
  55. PageRankVertex vertex = new PageRankVertex();
  56. vertex.setValue(new DoubleWritable(0));
  57. vertex.setId((Text) record.get(0));
  58. System.out.println(record.get(0));
  59. for (int i = 1; i < record.size(); i++) {
  60. Writable edge = record.get(i);
  61. System.out.println(edge.toString());
  62. if (!(edge.equals(NullWritable.get()))) {
  63. vertex.addEdge(new Text(edge.toString()), NullWritable.get());
  64. }
  65. }
  66. LOG.info("vertex edgs size: "
  67. + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
  68. context.addVertexRequest(vertex);
  69. }
  70. }
  71. private static void printUsage() {
  72. System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
  73. System.exit(-1);
  74. }
  75. public static void main(String[] args) throws IOException {
  76. if (args.length < 2)
  77. printUsage();
  78. GraphJob job = new GraphJob();
  79. job.setGraphLoaderClass(PageRankVertexReader.class);
  80. job.setVertexClass(PageRankVertex.class);
  81. job.addInput(TableInfo.builder().tableName(args[0]).build());
  82. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  83. // default max iteration is 30
  84. job.setMaxIteration(30);
  85. if (args.length >= 3)
  86. job.setMaxIteration(Integer.parseInt(args[2]));
  87. long startTime = System.currentTimeMillis();
  88. job.run();
  89. System.out.println("Job Finished in "
  90. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  91. }
  92. }

代码说明

PageRank 源代码包括以下几部分:

  • 55行:定义 PageRankVertexReader 类,加载图,将表中每一条记录解析为一个点,记录的第一列是起点,其他列为终点;
  • 23行:定义 PageRankVertex ,其中:
    • 点值表示该点(网页)的当前 PageRank 取值;
    • compute()方法使用迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum 更新点值;
    • cleanup() 方法把点及其 PageRank 取值写到结果表中;
  • 88行:主程序(main函数),定义 GraphJob,指定 Vertex/GraphLoader等的实现,以及最大迭代次数(默认 30),并指定输入输出表。
本文导读目录
本文导读目录
以上内容是否对您有帮助?