全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

输入边表

更新时间:2017-10-22 14:27:56

输入边表的代码,如下所示:

  1. import java.io.IOException;
  2. import com.aliyun.odps.conf.Configuration;
  3. import com.aliyun.odps.data.TableInfo;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.Vertex;
  8. import com.aliyun.odps.graph.VertexResolver;
  9. import com.aliyun.odps.graph.MutationContext;
  10. import com.aliyun.odps.graph.VertexChanges;
  11. import com.aliyun.odps.graph.Edge;
  12. import com.aliyun.odps.io.LongWritable;
  13. import com.aliyun.odps.io.WritableComparable;
  14. import com.aliyun.odps.io.WritableRecord;
  15. /**
  16. * 本示例是用于展示,对于不同类型的数据类型,如何编写图作业程序载入数据。主要展示GraphLoader和
  17. * VertexResolver的配合完成图的构建。
  18. *
  19. * ODPS Graph的作业输入都为ODPS的Table,假设作业输入有两张表,一张存储点的信息,一张存储边的信息。
  20. * 存储点信息的表的格式,如:
  21. * +------------------------+
  22. * | VertexID | VertexValue |
  23. * +------------------------+
  24. * | id0| 9|
  25. * +------------------------+
  26. * | id1| 7|
  27. * +------------------------+
  28. * | id2| 8|
  29. * +------------------------+
  30. *
  31. * 存储边信息的表的格式,如
  32. * +-----------------------------------+
  33. * | VertexID | DestVertexID| EdgeValue|
  34. * +-----------------------------------+
  35. * | id0| id1| 1|
  36. * +-----------------------------------+
  37. * | id0| id2| 2|
  38. * +-----------------------------------+
  39. * | id2| id1| 3|
  40. * +-----------------------------------+
  41. *
  42. * 结合两张表的数据,表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
  43. *
  44. * 对于此种类型的数据,在GraphLoader::load(LongWritable, Record, MutationContext)
  45. * ,可以使用 MutationContext#addVertexRequest(Vertex)向图中请求添加点,使用
  46. * link MutationContext#addEdgeRequest(WritableComparable, Edge)向图中请求添加边,然后,在
  47. * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
  48. * 中,将load 方法中添加的点和边,合并到一个Vertex对象中,作为返回值,添加到最后参与计算的图中。
  49. *
  50. **/
  51. public class VertexInputFormat {
  52. private final static String EDGE_TABLE = "edge.table";
  53. /**
  54. * 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。
  55. * <p>
  56. * 类似于com.aliyun.odps.mapreduce.Mapper#map
  57. * ,输入Record,生成键值对,此处的键是Vertex的ID,
  58. * 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
  59. *
  60. * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的VertexResolver
  61. * 中添加的点或边才参与计算。
  62. **/
  63. public static class VertexInputLoader extends
  64. GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
  65. private boolean isEdgeData;
  66. /**
  67. * 配置VertexInputLoader。
  68. *
  69. * @param conf
  70. * 作业的配置参数,在main中使用GraphJob配置的,或者在console中set的
  71. * @param workerId
  72. * 当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id
  73. * @param inputTableInfo
  74. * 当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式
  75. **/
  76. @Override
  77. public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
  78. isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
  79. }
  80. /**
  81. * 根据Record中的内容,解析为对应的边,并请求添加到图中。
  82. *
  83. * @param recordNum
  84. * 记录序列号,从1开始,每个worker上单独计数
  85. * @param record
  86. * 输入表中的记录,三列,分别表示初点、终点、边的权重
  87. * @param context
  88. * 上下文,请求将解释后的边添加到图中
  89. **/
  90. @Override
  91. public void load(
  92. LongWritable recordNum,
  93. WritableRecord record,
  94. MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  95. throws IOException {
  96. if (isEdgeData) {
  97. /**
  98. * 数据来源于存储边信息的表。
  99. *
  100. * 1、第一列表示初始点的ID
  101. **/
  102. LongWritable sourceVertexID = (LongWritable) record.get(0);
  103. /**
  104. * 2、第二列表示终点的ID
  105. **/
  106. LongWritable destinationVertexID = (LongWritable) record.get(1);
  107. /**
  108. * 3、地三列表示边的权重
  109. **/
  110. LongWritable edgeValue = (LongWritable) record.get(2);
  111. /**
  112. * 4、创建边,由终点ID和边的权重组成
  113. **/
  114. Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
  115. destinationVertexID, edgeValue);
  116. /**
  117. * 5、请求给初始点添加边
  118. **/
  119. context.addEdgeRequest(sourceVertexID, edge);
  120. /**
  121. * 6、如果每条Record表示双向边,重复4与5 Edge<LongWritable, LongWritable> edge2 = new
  122. * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
  123. * context.addEdgeRequest(destinationVertexID, edge2);
  124. **/
  125. } else {
  126. /**
  127. * 数据来源于存储点信息的表。
  128. *
  129. * 1、第一列表示点的ID
  130. **/
  131. LongWritable vertexID = (LongWritable) record.get(0);
  132. /**
  133. * 2、第二列表示点的值
  134. **/
  135. LongWritable vertexValue = (LongWritable) record.get(1);
  136. /**
  137. * 3、创建点,由点的ID和点的值组成
  138. **/
  139. MyVertex vertex = new MyVertex();
  140. /**
  141. * 4、初始化点
  142. **/
  143. vertex.setId(vertexID);
  144. vertex.setValue(vertexValue);
  145. /**
  146. * 5、请求添加点
  147. **/
  148. context.addVertexRequest(vertex);
  149. }
  150. }
  151. }
  152. /**
  153. * 汇总GraphLoader::load(LongWritable, Record, MutationContext)生成的键值对,类似于
  154. * com.aliyun.odps.mapreduce.Reducer中的reduce。对于唯一的Vertex ID,所有关于这个ID上
  155. * 添加\删除、点\边的行为都会放在VertexChanges中。
  156. *
  157. * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
  158. * 所有在load方法中请求生成的ID都会在此处被调用。
  159. **/
  160. public static class LoadingResolver extends
  161. VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {
  162. /**
  163. * 处理关于一个ID的添加或删除、点或边的请求。
  164. *
  165. * VertexChanges有四个接口,分别与MutationContext的四个接口对应:
  166. * VertexChanges::getAddedVertexList()与
  167. * MutationContext::addVertexRequest(Vertex)对应,
  168. * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中
  169. * VertexChanges::getAddedEdgeList()与
  170. * MutationContext::addEdgeRequest(WritableComparable, Edge)
  171. * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中
  172. * VertexChanges::getRemovedVertexCount()与
  173. * MutationContext::removeVertexRequest(WritableComparable)
  174. * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值
  175. * VertexChanges#getRemovedEdgeList()与
  176. * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
  177. * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中
  178. *
  179. * 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的Vertex不为null,
  180. * 则此ID会参与随后的计算,如果返回null,则不会参与计算。
  181. *
  182. * @param vertexId
  183. * 请求添加的点的ID,或请求添加的边的初点ID
  184. * @param vertex
  185. * 已存在的Vertex对象,数据载入阶段,始终为null
  186. * @param vertexChanges
  187. * 此ID上的请求添加\删除、点\边的集合
  188. * @param hasMessages
  189. * 此ID是否有输入消息,数据载入阶段,始终为false
  190. **/
  191. @Override
  192. public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
  193. LongWritable vertexId,
  194. Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
  195. VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
  196. boolean hasMessages) throws IOException {
  197. /**
  198. * 1、获取Vertex对象,作为参与计算的点。
  199. **/
  200. MyVertex computeVertex = null;
  201. if (vertexChanges.getAddedVertexList() == null
  202. || vertexChanges.getAddedVertexList().isEmpty()) {
  203. computeVertex = new MyVertex();
  204. computeVertex.setId(vertexId);
  205. } else {
  206. /**
  207. * 此处假设存储点信息的表中,每个Record表示唯一的点。
  208. **/
  209. computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
  210. }
  211. /**
  212. * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
  213. **/
  214. if (vertexChanges.getAddedEdgeList() != null) {
  215. for (Edge<LongWritable, LongWritable> edge : vertexChanges
  216. .getAddedEdgeList()) {
  217. computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
  218. }
  219. }
  220. /**
  221. * 3、将Vertex对象返回,添加到最终的图中参与计算。
  222. **/
  223. return computeVertex;
  224. }
  225. }
  226. /**
  227. * 确定参与计算的Vertex的行为。
  228. *
  229. **/
  230. public static class MyVertex extends
  231. Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
  232. /**
  233. * 将vertex的边,按照输入表的格式再写到结果表。输入表与输出表的格式和数据都相同。
  234. *
  235. * @param context
  236. * 运行时上下文
  237. * @param messages
  238. * 输入消息
  239. **/
  240. @Override
  241. public void compute(
  242. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
  243. Iterable<LongWritable> messages) throws IOException {
  244. /**
  245. * 将点的ID和值,写到存储点的结果表
  246. **/
  247. context.write("vertex", getId(), getValue());
  248. /**
  249. * 将点的边,写到存储边的结果表
  250. **/
  251. if (hasEdges()) {
  252. for (Edge<LongWritable, LongWritable> edge : getEdges()) {
  253. context.write("edge", getId(), edge.getDestVertexId(),
  254. edge.getValue());
  255. }
  256. }
  257. /**
  258. * 只迭代一轮
  259. **/
  260. voteToHalt();
  261. }
  262. }
  263. /**
  264. * @param args
  265. * @throws IOException
  266. */
  267. public static void main(String[] args) throws IOException {
  268. if (args.length < 4) {
  269. throw new IOException(
  270. "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
  271. }
  272. /**
  273. * GraphJob用于对Graph作业进行配置
  274. */
  275. GraphJob job = new GraphJob();
  276. /**
  277. * 1、指定输入的图数据,并指定存储边数据所在的表。
  278. */
  279. job.addInput(TableInfo.builder().tableName(args[0]).build());
  280. job.addInput(TableInfo.builder().tableName(args[1]).build());
  281. job.set(EDGE_TABLE, args[1]);
  282. /**
  283. * 2、指定载入数据的方式,将Record解释为Edge,此处类似于map,生成的 key为vertex的ID,value为edge。
  284. */
  285. job.setGraphLoaderClass(VertexInputLoader.class);
  286. /**
  287. * 3、指定载入数据阶段,生成参与计算的vertex。此处类似于reduce,将map 生成的edge合并成一个vertex。
  288. */
  289. job.setLoadingVertexResolverClass(LoadingResolver.class);
  290. /**
  291. * 4、指定参与计算的vertex的行为。每轮迭代执行vertex.compute方法。
  292. */
  293. job.setVertexClass(MyVertex.class);
  294. /**
  295. * 5、指定图作业的输出表,将计算生成的结果写到结果表中。
  296. */
  297. job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
  298. job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());
  299. /**
  300. * 6、提交作业执行。
  301. */
  302. job.run();
  303. }
  304. }
本文导读目录