全部产品
MaxCompute

输入边表示例

更新时间:2017-06-07 13:26:11   分享:   

import java.io.IOException;

import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableRecord;

/**
 * 本示例是用于展示,对于不同类型的数据类型,如何编写图作业程序载入数据。主要展示GraphLoader和
 * VertexResolver的配合完成图的构建。
 *
 * ODPS Graph的作业输入都为ODPS的Table,假设作业输入有两张表,一张存储点的信息,一张存储边的信息。
 * 存储点信息的表的格式,如:
 * +------------------------+
 * | VertexID | VertexValue |
 * +------------------------+
 * |       id0|            9|
 * +------------------------+
 * |       id1|            7|
 * +------------------------+
 * |       id2|            8|
 * +------------------------+
 *
 * 存储边信息的表的格式,如
 * +-----------------------------------+
 * | VertexID | DestVertexID| EdgeValue|
 * +-----------------------------------+
 * |       id0|          id1|         1|
 * +-----------------------------------+
 * |       id0|          id2|         2|
 * +-----------------------------------+
 * |       id2|          id1|         3|
 * +-----------------------------------+
 *
 * 结合两张表的数据,表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
 *
 * 对于此种类型的数据,在GraphLoader::load(LongWritable, Record, MutationContext)
 * ,可以使用 MutationContext#addVertexRequest(Vertex)向图中请求添加点,使用
 * link MutationContext#addEdgeRequest(WritableComparable, Edge)向图中请求添加边,然后,在
 * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
 * 中,将load 方法中添加的点和边,合并到一个Vertex对象中,作为返回值,添加到最后参与计算的图中。
 *
 **/
public class VertexInputFormat {

  private final static String EDGE_TABLE = "edge.table";

  /**
   * 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。
   * <p>
   * 类似于com.aliyun.odps.mapreduce.Mapper#map
   * ,输入Record,生成键值对,此处的键是Vertex的ID,
   * 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
   *
   * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的VertexResolver
   * 中添加的点或边才参与计算。
   **/
  public static class VertexInputLoader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    private boolean isEdgeData;

    /**
     * 配置VertexInputLoader。
     *
     * @param conf
     *          作业的配置参数,在main中使用GraphJob配置的,或者在console中set的
     * @param workerId
     *          当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id
     * @param inputTableInfo
     *          当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式
     **/
    @Override
    public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
      isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
    }

    /**
     * 根据Record中的内容,解析为对应的边,并请求添加到图中。
     *
     * @param recordNum
     *          记录序列号,从1开始,每个worker上单独计数
     * @param record
     *          输入表中的记录,三列,分别表示初点、终点、边的权重
     * @param context
     *          上下文,请求将解释后的边添加到图中
     **/
    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      if (isEdgeData) {
        /**
         * 数据来源于存储边信息的表。
         *
         * 1、第一列表示初始点的ID
         **/
        LongWritable sourceVertexID = (LongWritable) record.get(0);

        /**
         * 2、第二列表示终点的ID
         **/
        LongWritable destinationVertexID = (LongWritable) record.get(1);

        /**
         * 3、地三列表示边的权重
         **/
        LongWritable edgeValue = (LongWritable) record.get(2);

        /**
         * 4、创建边,由终点ID和边的权重组成
         **/
        Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
            destinationVertexID, edgeValue);

        /**
         * 5、请求给初始点添加边
         **/
        context.addEdgeRequest(sourceVertexID, edge);

        /**
         * 6、如果每条Record表示双向边,重复4与5 Edge<LongWritable, LongWritable> edge2 = new
         * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
         * context.addEdgeRequest(destinationVertexID, edge2);
         **/
      } else {
        /**
         * 数据来源于存储点信息的表。
         *
         * 1、第一列表示点的ID
         **/
        LongWritable vertexID = (LongWritable) record.get(0);

        /**
         * 2、第二列表示点的值
         **/
        LongWritable vertexValue = (LongWritable) record.get(1);

        /**
         * 3、创建点,由点的ID和点的值组成
         **/
        MyVertex vertex = new MyVertex();

        /**
         * 4、初始化点
         **/
        vertex.setId(vertexID);
        vertex.setValue(vertexValue);

        /**
         * 5、请求添加点
         **/
        context.addVertexRequest(vertex);
      }

    }

  }

  /**
   * 汇总GraphLoader::load(LongWritable, Record, MutationContext)生成的键值对,类似于
   * com.aliyun.odps.mapreduce.Reducer中的reduce。对于唯一的Vertex ID,所有关于这个ID上
   * 添加\删除、点\边的行为都会放在VertexChanges中。
   *
   * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
   * 所有在load方法中请求生成的ID都会在此处被调用。
   **/
  public static class LoadingResolver extends
      VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 处理关于一个ID的添加或删除、点或边的请求。
     *
     * VertexChanges有四个接口,分别与MutationContext的四个接口对应:
     * VertexChanges::getAddedVertexList()与
     * MutationContext::addVertexRequest(Vertex)对应,
     * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中
     * VertexChanges::getAddedEdgeList()与
     * MutationContext::addEdgeRequest(WritableComparable, Edge)
     * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中
     * VertexChanges::getRemovedVertexCount()与
     * MutationContext::removeVertexRequest(WritableComparable)
     * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值
     * VertexChanges#getRemovedEdgeList()与
     * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
     * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中
     *
     * 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的Vertex不为null,
     * 则此ID会参与随后的计算,如果返回null,则不会参与计算。
     *
     * @param vertexId
     *          请求添加的点的ID,或请求添加的边的初点ID
     * @param vertex
     *          已存在的Vertex对象,数据载入阶段,始终为null
     * @param vertexChanges
     *          此ID上的请求添加\删除、点\边的集合
     * @param hasMessages
     *          此ID是否有输入消息,数据载入阶段,始终为false
     **/
    @Override
    public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
        LongWritable vertexId,
        Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
        VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
        boolean hasMessages) throws IOException {
      /**
       * 1、获取Vertex对象,作为参与计算的点。
       **/
      MyVertex computeVertex = null;
      if (vertexChanges.getAddedVertexList() == null
          || vertexChanges.getAddedVertexList().isEmpty()) {
        computeVertex = new MyVertex();
        computeVertex.setId(vertexId);
      } else {
        /**
         * 此处假设存储点信息的表中,每个Record表示唯一的点。
         **/
        computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
      }

      /**
       * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
       **/
      if (vertexChanges.getAddedEdgeList() != null) {
        for (Edge<LongWritable, LongWritable> edge : vertexChanges
            .getAddedEdgeList()) {
          computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
        }
      }

      /**
       * 3、将Vertex对象返回,添加到最终的图中参与计算。
       **/
      return computeVertex;
    }

  }

  /**
   * 确定参与计算的Vertex的行为。
   *
   **/
  public static class MyVertex extends
      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 将vertex的边,按照输入表的格式再写到结果表。输入表与输出表的格式和数据都相同。
     *
     * @param context
     *          运行时上下文
     * @param messages
     *          输入消息
     **/
    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
        Iterable<LongWritable> messages) throws IOException {
      /**
       * 将点的ID和值,写到存储点的结果表
       **/
      context.write("vertex", getId(), getValue());

      /**
       * 将点的边,写到存储边的结果表
       **/
      if (hasEdges()) {
        for (Edge<LongWritable, LongWritable> edge : getEdges()) {
          context.write("edge", getId(), edge.getDestVertexId(),
              edge.getValue());
        }
      }

      /**
       * 只迭代一轮
       **/
      voteToHalt();
    }

  }

  /**
   * @param args
   * @throws IOException
   */
  public static void main(String[] args) throws IOException {

    if (args.length < 4) {
      throw new IOException(
          "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
    }

    /**
     * GraphJob用于对Graph作业进行配置
     */
    GraphJob job = new GraphJob();

    /**
     * 1、指定输入的图数据,并指定存储边数据所在的表。
     */
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addInput(TableInfo.builder().tableName(args[1]).build());
    job.set(EDGE_TABLE, args[1]);

    /**
     * 2、指定载入数据的方式,将Record解释为Edge,此处类似于map,生成的 key为vertex的ID,value为edge。
     */
    job.setGraphLoaderClass(VertexInputLoader.class);

    /**
     * 3、指定载入数据阶段,生成参与计算的vertex。此处类似于reduce,将map 生成的edge合并成一个vertex。
     */
    job.setLoadingVertexResolverClass(LoadingResolver.class);

    /**
     * 4、指定参与计算的vertex的行为。每轮迭代执行vertex.compute方法。
     */
    job.setVertexClass(MyVertex.class);

    /**
     * 5、指定图作业的输出表,将计算生成的结果写到结果表中。
     */
    job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
    job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());

    /**
     * 6、提交作业执行。
     */
    job.run();
  }

}
本文导读目录
本文导读目录
以上内容是否对您有帮助?