全部产品

使用spark graphframes分析图

更新时间:2018-12-29 11:41:19

quick-start

项目依赖

GraphDB图计算依赖spark-2.3.1,请提前下载spark-2.3.1并安装spark

下载 spark-hgraph-connector

  1. git clone git@github.com:chentiantai/spark-hgraph-connector.git

源码编译

  1. $ cd spark-hgraph-connector
  2. $ mvn -DskipTests package

配置spark

  • 删除spark过期的guava.jar包

    1. rm -rf jars/guava-14.0.1.jar
  • 将hbase相关版本的包放入到spark 安装目录jars中,下列jar在${spark-hgraph-connector}/lib下

    1. com.aliyun.hbase_alihbase-annotations-1.1.3.jar
    2. com.aliyun.hbase_alihbase-client-1.1.3.jar
    3. com.aliyun.hbase_alihbase-common-1.1.3.jar
    4. com.aliyun.hbase_alihbase-protocol-1.1.3.jar
  • 重新启动spark集群

执行图算法

  1. sh run.sh
  2. Usage: run.sh <spark-master> <hbase-zk> <graphName> <algorithm> <algorithm-args>
  3. algorithm:
  4. PageRank
  5. PersonalizedPageRank <vertexId>
  6. ConnectedComponents <output> //output stored to hdfs filesystem
  7. StronglyConnectedComponents <output> //output stored to hdfs filesystem
  8. LabelPropagation
  9. ShortestPaths <vertexId...> //vertexId... 指定单源序列,查询所有其他顶点到此顶点距离
  10. TriangleCount
  11. SvdPlusPlus
  12. $ ./run.sh $spark-master $hbase-zk-url testGraph ConnectedComponents hdfs://hdfs-cluster/testGraph/cc

编写你自己的图算法

接下来将展示如何使用Apache Spark GraphFrames分析GraphDB。

为了加载存储在GraphDB for GraphFrames中的数据,我们需要将GraphDB中的顶点和边缘数据导入Spark DataFrames。Hortonworks提供了一个Spark-on-HBase连接器来做到这一点。Spark-on-HBase Connector允许通过实现SHCDataType特征来创建自定义serde(序列化器/反序列化器)类型。GraphDB的serde参考此处

为了演示如何将GraphDB与GraphFrames一起使用,我们首先使用GraphDB创建GraphFrames用户指南中使用的相同图形示例。

  1. Vertex a = graph.addVertex(T.id, "a", "name", "Alice", "age", 34);
  2. Vertex b = graph.addVertex(T.id, "b", "name", "Bob", "age", 36);
  3. Vertex c = graph.addVertex(T.id, "c", "name", "Charlie", "age", 30);
  4. Vertex d = graph.addVertex(T.id, "d", "name", "David", "age", 29);
  5. Vertex e = graph.addVertex(T.id, "e", "name", "Esther", "age", 32);
  6. Vertex f = graph.addVertex(T.id, "f", "name", "Fanny", "age", 36);
  7. Vertex g = graph.addVertex(T.id, "g", "name", "Gabby", "age", 60);
  8. a.addEdge("friend", b);
  9. b.addEdge("follow", c);
  10. c.addEdge("follow", b);
  11. f.addEdge("follow", c);
  12. e.addEdge("follow", f);
  13. e.addEdge("friend", d);
  14. d.addEdge("friend", a);
  15. a.addEdge("friend", e);

图已存储在GraphDB中,我们需要指定schema, Spark-on-HBase Connector用于检索顶点和边数据。

  1. def vertexCatalog = s"""{
  2. |"table":{"namespace":"testGraph", "name":"vertices",
  3. | "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
  4. |"rowkey":"key",
  5. |"columns":{
  6. |"id":{"cf":"rowkey", "col":"key", "type":"string"},
  7. |"name":{"cf":"f", "col":"name", "type":"string"},
  8. |"age":{"cf":"f", "col":"age", "type":"int"}
  9. |}
  10. |}""".stripMargin
  11. def edgeCatalog = s"""{
  12. |"table":{"namespace":"testGraph", "name":"edges",
  13. | "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
  14. |"rowkey":"key",
  15. |"columns":{
  16. |"id":{"cf":"rowkey", "col":"key", "type":"string"},
  17. |"relationship":{"cf":"f", "col":"~l", "type":"string"},
  18. |"src":{"cf":"f", "col":"~f", "type":"string"},
  19. |"dst":{"cf":"f", "col":"~t", "type":"string"}
  20. |}
  21. |}""".stripMargin

关于此schema的一些注意事项:

  • HGraphDB serde tableCoder如上所述。
  • 所有GraphDB列都存储在名为的列族中f。
  • 顶点和边标签存储在列名为~l列中。
  • 源列和目标列分别存储在列名为~f和~t列中。
  • 所有顶点和边属性都存储在列中,列名只是属性的名称。

现在我们有了一个schema,我们可以为顶点和边创建Spark DataFrames,然后将它们传递给GraphFrame 工厂。

  1. def withCatalog(cat: String): DataFrame = {
  2. sqlContext
  3. .read
  4. .options(Map(HBaseTableCatalog.tableCatalog->cat, HBaseRelation.HBASE_CONFIGURATION -> s"""{"hbase.zookeeper.quorum":"$zkUrl"}"""))
  5. .format("org.apache.spark.sql.execution.datasources.hbase")
  6. .load()
  7. }
  8. val verticesDf = withCatalog(vertexCatalog(graphName))
  9. val edgesDf = withCatalog(edgeCatalog(graphName))
  10. val g = GraphFrame(verticesDf, edgesDf)

有了这些GraphFrame,我们现在可以完全访问Spark GraphFrame API。例如,以下是GraphFrames快速入门中的一些图形操作。

  1. //查询:获取每个顶点的入度。
  2. g.inDegrees.show()
  3. //查询:计算图表中“跟随”连接的数量。
  4. val numFollows = g.edges.filter("relationship = 'follow'").count()
  5. //运行PageRank算法,并显示结果。
  6. val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
  7. results.vertices.select("id", "pagerank").show()
  8. results.edges.select("src", "dst", "weight").show()

通过该例子您可以使用graphframes实现您自己的的图算法,也可以直接使用我们封装的一些通用算法工具