全部产品
MaxCompute

连通分量

更新时间:2017-06-07 13:26:11   分享:   

两个顶点之间存在路径,称两个顶点为连通的。如果无向图G中任意两个顶点都是连通的,则称G为连通图,否则称为非连通图, 其顶点个数极大的连通子图称为连通分量。本算法计算每个点的连通分量成员,最后输出顶点值中包含最小顶点id的连通分量。 将最小顶点id沿着边传播到连通分量的所有顶点。

源代码

import java.io.IOException;

import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.WritableRecord;

/**
 * Compute the connected component membership of each vertex and output
 * each vertex which's value containing the smallest id in the connected
 * component containing that vertex.
 *
 * Algorithm: propagate the smallest vertex id along the edges to all
 * vertices of a connected component.
 *
 */
public class ConnectedComponents {

  public static class CCVertex extends
    Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {

    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
        Iterable<LongWritable> msgs) throws IOException {

      if (context.getSuperstep() == 0L) {
        this.setValue(getId());
        context.sendMessageToNeighbors(this, getValue());
        return;
      }

      long minID = Long.MAX_VALUE;
      for (LongWritable id : msgs) {
        if (id.get() < minID) {
          minID = id.get();
        }
      }

      if (minID < this.getValue().get()) {
        this.setValue(new LongWritable(minID));
        context.sendMessageToNeighbors(this, getValue());
      } else {
        this.voteToHalt();
      }
    }

    /**
     * Output Table Description:
     * +-----------------+----------------------------------------+
     * | Field | Type    | Comment                                |
     * +-----------------+----------------------------------------+
     * | v     | bigint  | vertex id                              |
     * | minID | bigint  | smallest id in the connected component |
     * +-----------------+----------------------------------------+
     */
    @Override
    public void cleanup(
        WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }


  /**
   * Input Table Description:
   * +-----------------+----------------------------------------------------+
   * | Field | Type    | Comment                                            |
   * +-----------------+----------------------------------------------------+
   * | v     | bigint  | vertex id                                          |
   * | es    | string  | comma separated target vertex id of outgoing edges |
   * +-----------------+----------------------------------------------------+
   *
   * Example:
   * For graph:
   *       1 ----- 2
   *       |       |
   *       3 ----- 4
   * Input table:
   * +-----------+
   * | v  | es   |
   * +-----------+
   * | 1  | 2,3  |
   * | 2  | 1,4  |
   * | 3  | 1,4  |
   * | 4  | 2,3  |
   * +-----------+
   */
  public static class CCVertexReader extends
    GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
    throws IOException {
      CCVertex vertex = new CCVertex();

      vertex.setId((LongWritable) record.get(0));

      String[] edges = record.get(1).toString().split(",");
      for (int i = 0; i < edges.length; i++) {
        long destID = Long.parseLong(edges[i]);
        vertex.addEdge(new LongWritable(destID), NullWritable.get());
      }

      context.addVertexRequest(vertex);
    }

  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2) {
      System.out.println("Usage: <input> <output>");
      System.exit(-1);
    }

    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(CCVertexReader.class);
    job.setVertexClass(CCVertex.class);
    job.setCombinerClass(MinLongCombiner.class);

    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());
    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}
本文导读目录
本文导读目录
以上内容是否对您有帮助?