当前位置:K88软件开发文章中心编程语言SQLSpark → 文章内容

Spark GraphX图操作符

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-19 4:50:59

由 ligaihe 创建,路飞 最后一次修改 2016-02-24 Spark GraphX图操作符正如RDDs有基本的操作map, filter和reduceByKey一样,属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。定义在Graph中的核心操作是经过优化的实现。表示为核心操作的组合的便捷操作定义在GraphOps中。然而,因为有Scala的隐式转换,定义在GraphOps中的操作可以作为Graph的成员自动使用。例如,我们可以通过下面的方式计算每个顶点(定义在GraphOps中)的入度。val graph: Graph[(String, String), String]// Use the implicit GraphOps.inDegrees operatorval inDegrees: VertexRDD[Int] = graph.inDegrees区分核心图操作和GraphOps的原因是为了在将来支持不同的图表示。每个图表示都必须提供核心操作的实现并重用很多定义在GraphOps中的有用操作。操作一览一下是定义在Graph和GraphOps中(为了简单起见,表现为图的成员)的功能的快速浏览。注意,某些函数签名已经简化(如默认参数和类型的限制已删除),一些更高级的功能已经被删除,所以请参阅API文档了解官方的操作列表。/** Summary of the functionality in the property graph */class Graph[VD, ED] { // Information about the Graph =================================================================== val numEdges: Long val numVertices: Long val inDegrees: VertexRDD[Int] val outDegrees: VertexRDD[Int] val degrees: VertexRDD[Int] // Views of the graph as collections ============================================================= val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] val triplets: RDD[EdgeTriplet[VD, ED]] // Functions for caching graphs ================================================================== def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic ============================================================ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) : Graph[VD, ED2] // Modify the graph structure ==================================================================== def reverse: Graph[VD, ED] def subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (VertexID, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] // Join RDDs with the graph ====================================================================== def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED] def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)]) (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED] // Aggregate information about adjacent triplets ================================================= def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] // Iterative graph-parallel computation ========================================================== def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) : Graph[VD, ED] // Basic graph algorithms ======================================================================== def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] def connectedComponents(): Graph[VertexID, ED] def triangleCount(): Graph[Int, ED] def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]}属性操作如RDD的map操作一样,属性图包含下面的操作:class Graph[VD, ED] { def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]}每个操作都产生一个新的图,这个新的图包含通过用户自定义的map操作修改后的顶点或边的属性。注意,每种情况下图结构都不受影响。这些操作的一个重要特征是它允许所得图形重用原有图形的结构索引(indices)。下面的两行代码在逻辑上是等价的,但是第一个不保存结构索引,所以不会从GraphX系统优化中受益。val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }val newGraph = Graph(newVertices, graph.edges)另一种方法是用mapVertices(ClassTag[VD2]):Graph[VD2,ED])保存索引。val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))这些

[1] [2] [3] [4]  下一页


Spark GraphX图操作符