本文为您提供输入点表的代码。
输入点表的代码,如下所示。
import java.io.IOException;
import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableRecord;
/**
* 本示例是用于展示,对于不同类型的数据类型,如何编写图作业程序载入数据。主要展示GraphLoader和VertexResolver配合完成图的构建。
* ODPS Graph的作业输入都为ODPS的Table,假设作业输入有两张表,一张存储点的信息,一张存储边的信息。
* 存储点信息的表的格式,如下。
* +------------------------+
* | VertexID | VertexValue |
* +------------------------+
* | id0| 9|
* +------------------------+
* | id1| 7|
* +------------------------+
* | id2| 8|
* +------------------------+
*
* 存储边信息的表的格式如下。
* +-----------------------------------+
* | VertexID | DestVertexID| EdgeValue|
* +-----------------------------------+
* | id0| id1| 1|
* +-----------------------------------+
* | id0| id2| 2|
* +-----------------------------------+
* | id2| id1| 3|
* +-----------------------------------+
*
* 结合两张表的数据,表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
*
* 对于此种类型的数据,在GraphLoader::load(LongWritable, Record, MutationContext)
* ,可以使用MutationContext#addVertexRequest(Vertex)向图中请求添加点,使用
* link MutationContext#addEdgeRequest(WritableComparable, Edge)向图中请求添加边,然后,在
* link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
* 中,将load 方法中添加的点和边合并到一个Vertex对象中,作为返回值,添加到最后参与计算的图中。
*
**/
public class VertexInputFormat {
private final static String EDGE_TABLE = "edge.table";
/**
* 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。
*
* 类似于com.aliyun.odps.mapreduce.Mapper#map,输入Record,生成键值对,此处的键是Vertex的ID,
* 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
*
* 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的VertexResolver
* 中添加的点或边才参与计算。
**/
public static class VertexInputLoader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
private boolean isEdgeData;
/**
* 配置VertexInputLoader。
*
* @param conf
* 作业的配置参数,是在main中使用GraphJob配置的,或者是在console中set的。
* @param workerId
* 当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id。
* @param inputTableInfo
* 当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式。
**/
@Override
public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
}
/**
* 根据Record中的内容,解析为对应的边,并请求添加到图中。
*
* @param recordNum
* 记录序列号,从1开始,每个worker上单独计数。
* @param record
* 输入表中的记录,三列,分别表示初点、终点、边的权重。
* @param context
* 上下文,请求将解释后的边添加到图中。
**/
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
if (isEdgeData) {
/**
* 数据来源于存储边信息的表。
*
* 1、第一列表示初始点的ID。
**/
LongWritable sourceVertexID = (LongWritable) record.get(0);
/**
* 2、第二列表示终点的ID。
**/
LongWritable destinationVertexID = (LongWritable) record.get(1);
/**
* 3、第三列表示边的权重。
**/
LongWritable edgeValue = (LongWritable) record.get(2);
/**
* 4、创建边,由终点ID和边的权重组成。
**/
Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
destinationVertexID, edgeValue);
/**
* 5、请求给初始点添加边。
**/
context.addEdgeRequest(sourceVertexID, edge);
/**
* 6、如果每条Record表示双向边,重复4与5 。
* Edge<LongWritable, LongWritable> edge2 = new
* Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
* context.addEdgeRequest(destinationVertexID, edge2);
**/
} else {
/**
* 数据来源于存储点信息的表。
*
* 1、第一列表示点的ID。
**/
LongWritable vertexID = (LongWritable) record.get(0);
/**
* 2、第二列表示点的值。
**/
LongWritable vertexValue = (LongWritable) record.get(1);
/**
* 3、创建点,由点的ID和点的值组成。
**/
MyVertex vertex = new MyVertex();
/**
* 4、初始化点。
**/
vertex.setId(vertexID);
vertex.setValue(vertexValue);
/**
* 5、请求添加点。
**/
context.addVertexRequest(vertex);
}
}
}
/**
* 汇总GraphLoader::load(LongWritable, Record, MutationContext)生成的键值对,类似于
* com.aliyun.odps.mapreduce.Reducer中的reduce。对于唯一的Vertex ID,所有关于这个ID上
* 添加\删除、点\边的行为都会放在VertexChanges中。
*
* 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
* 所有在load方法中请求生成的ID都会在此处被调用。
**/
public static class LoadingResolver extends
VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {
/**
* 处理关于一个ID的添加或删除、点或边的请求。
*
* VertexChanges有四个接口,分别与MutationContext的四个接口对应:
* VertexChanges::getAddedVertexList()与
* MutationContext::addVertexRequest(Vertex)对应,
* 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中。
* VertexChanges::getAddedEdgeList()与
* MutationContext::addEdgeRequest(WritableComparable, Edge)
* 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中。
* VertexChanges::getRemovedVertexCount()与
* MutationContext::removeVertexRequest(WritableComparable)
* 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值。
* VertexChanges#getRemovedEdgeList()与
* MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
* 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中。
*
* 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的Vertex不为null,
* 则此ID会参与随后的计算,如果返回null,则不会参与计算。
*
* @param vertexId
* 请求添加的点的ID,或请求添加的边的初点ID。
* @param vertex
* 已存在的Vertex对象,数据载入阶段,始终为null。
* @param vertexChanges
* 此ID上的请求添加\删除、点\边的集合。
* @param hasMessages
* 此ID是否有输入消息,数据载入阶段,始终为false。
**/
@Override
public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
LongWritable vertexId,
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
boolean hasMessages) throws IOException {
/**
* 1、获取Vertex对象,作为参与计算的点。
**/
MyVertex computeVertex = null;
if (vertexChanges.getAddedVertexList() == null
|| vertexChanges.getAddedVertexList().isEmpty()) {
computeVertex = new MyVertex();
computeVertex.setId(vertexId);
} else {
/**
* 此处假设存储点信息的表中,每个Record表示唯一的点。
**/
computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
}
/**
* 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
**/
if (vertexChanges.getAddedEdgeList() != null) {
for (Edge<LongWritable, LongWritable> edge : vertexChanges
.getAddedEdgeList()) {
computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
}
}
/**
* 3、将Vertex对象返回,添加到最终的图中参与计算。
**/
return computeVertex;
}
}
/**
* 确定参与计算的Vertex的行为。
*
**/
public static class MyVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
/**
* 将vertex的边,按照输入表的格式再写到结果表。输入表与输出表的格式和数据都相同。
*
* @param context
* 运行时上下文。
* @param messages
* 输入消息。
**/
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
/**
* 将点的ID和值,写到存储点的结果表。
**/
context.write("vertex", getId(), getValue());
/**
* 将点的边,写到存储边的结果表。
**/
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> edge : getEdges()) {
context.write("edge", getId(), edge.getDestVertexId(),
edge.getValue());
}
}
/**
* 只迭代一轮。
**/
voteToHalt();
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length < 4) {
throw new IOException(
"Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
}
/**
* GraphJob用于对Graph作业进行配置。
*/
GraphJob job = new GraphJob();
/**
* 1、指定输入的图数据,并指定存储边数据所在的表。
*/
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.set(EDGE_TABLE, args[1]);
/**
* 2、指定载入数据的方式,将Record解释为Edge,此处类似于map,生成的key为vertex的ID,value为edge。
*/
job.setGraphLoaderClass(VertexInputLoader.class);
/**
* 3、指定载入数据阶段,生成参与计算的vertex。此处类似于reduce,将map生成的edge合并成一个vertex。
*/
job.setLoadingVertexResolverClass(LoadingResolver.class);
/**
* 4、指定参与计算的vertex的行为。每轮迭代执行vertex.compute方法。
*/
job.setVertexClass(MyVertex.class);
/**
* 5、指定图作业的输出表,将计算生成的结果写到结果表中。
*/
job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());
/**
* 6、提交作业执行。
*/
job.run();
}
}