当前位置:K88软件开发文章中心编程语言SQLSpark → 文章内容

Spark GraphX图操作符

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-19 4:50:59

操作经常用来初始化的图形,用作特定计算或者用来处理项目不需要的属性。例如,给定一个图,这个图的顶点特征包含出度,我们为PageRank初始化它。// Given a graph where the vertex property is the out degreeval inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))// Construct a graph where each edge contains the weight// and each vertex is the initial PageRankval outputGraph: Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)结构性操作当前的GraphX仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表。class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]}reverse操作返回一个新的图,这个图的边的方向都是反转的。例如,这个操作可以用来计算反转的PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以我们可以在不移动或者复制数据的情况下有效地实现它。subgraph:Graph[VD,ED])操作利用顶点和边的谓词(predicates),返回的图仅仅包含满足顶点谓词的顶点、满足边谓词的边以及满足顶点谓词的连接顶点(connect vertices)。subgraph操作可以用于很多场景,如获取感兴趣的顶点和边组成的图或者获取清除断开链接后的图。下面的例子删除了断开的链接。// Create an RDD for the verticesval users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student"))))// Create an RDD for edgesval relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))// Define a default user in case there are relationship with missing userval defaultUser = ("John Doe", "Missing")// Build the initial Graphval graph = Graph(users, relationships, defaultUser)// Notice that there is a user 0 (for which we have no information) connected to users// 4 (peter) and 5 (franklin).graph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 ).collect.foreach(println(_))// Remove missing vertices as well as the edges to connected to themval validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")// The valid subgraph will disconnect users 4 and 5 by removing user 0validGraph.vertices.collect.foreach(println(_))validGraph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 ).collect.foreach(println(_))注意,上面的例子中,仅仅提供了顶点谓词。如果没有提供顶点或者边的谓词,subgraph操作默认为true。mask操作构造一个子图,这个子图包含输入图中包含的顶点和边。这个操作可以和subgraph操作相结合,基于另外一个相关图的特征去约束一个图。例如,我们可能利用缺失顶点的图运行连通体(?连通组件connected components),然后返回有效的子图。/ Run Connected Componentsval ccGraph = graph.connectedComponents() // No longer contains missing field// Remove missing vertices as well as the edges to connected to themval validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")// Restrict the answer to the valid subgraphval validCCGraph = ccGraph.mask(validGraph)groupEdges:Graph[VD,ED])操作合并多重图中的并行边(如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。连接操作在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用join操作完成。下面列出的是主要的join操作。class Graph[VD, ED] { def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED] def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED]}joinVertices((VertexId,VD,U)?VD)(ClassTag[U]):Graph[VD,ED])操作将输入RDD和顶点相结合,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户定义的map函数获得的。在RDD中没有匹配值的顶点保留其原始值。注意,对于给定的顶点,如果RDD中有超过1个的匹配值,则仅仅使用其中的一个。建议用下面的方法保证输入RDD的唯一性。下面的方法也会预索引返回的值用以加快后续的join操作。val nonUniqueCosts: RDD[(VertexID, Double)]val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)val joinedGraph = graph.joinVertices(uniqueCosts)( (id, oldCost, extraCost) => oldCost + extraCost)除了将用户自定义的map函数用到所有顶点和改变顶点属性类型以外,更一般的outerJoinVertices((VertexId,VD,Option[U])?VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED])与joinVertices类似。因为并不是所有顶点在RDD中拥有匹配的值,map函数需要一个option类型。val outDegrees: VertexRDD[Int] = graph.outDegreesval degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) => outDegOpt m

上一页  [1] [2] [3] [4]  下一页


Spark GraphX图操作符