当前位置:K88软件开发文章中心编程语言SQLSpark → 文章内容

Spark GraphX图操作符

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-19 4:50:59

atch { case Some(outDeg) => outDeg case None => 0 // No outDegree means zero outDegree }}你可能已经注意到了,在上面的例子中用到了curry函数的多参数列表。虽然我们可以将f(a)(b)写成f(a,b),但是f(a,b)意味着b的类型推断将不会依赖于a。因此,用户需要为定义的函数提供类型标注。val joinedGraph = graph.joinVertices(uniqueCosts, (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)相邻聚合(Neighborhood Aggregation)图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体)多次聚合相邻顶点的属性。为了提高性能,主要的聚合操作从graph.mapReduceTriplets改为了新的graph.AggregateMessages。虽然API的改变相对较小,但是我们仍然提供了过渡的指南。聚合消息(aggregateMessages)GraphX中的核心聚合操作是aggregateMessages(ClassTag[A]):VertexRDD[A])。这个操作将用户定义的sendMsg函数应用到图的每个边三元组(edge triplet),然后应用mergeMsg函数在其目的顶点聚合这些消息。class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg]}用户自定义的sendMsg函数是一个EdgeContext类型。它暴露源和目的属性以及边缘属性以及发送消息给源和目的属性的函数(sendToSrc和sendToDst)。可将sendMsg函数看做map-reduce过程中的map函数。用户自定义的mergeMsg函数指定两个消息到相同的顶点并保存为一个消息。可以将mergeMsg函数看做map-reduce过程中的reduce函数。aggregateMessages(ClassTag[A]):VertexRDD[A])操作返回一个包含聚合消息(目的地为每个顶点)的VertexRDD[Msg]。没有接收到消息的顶点不包含在返回的VertexRDD中。另外,aggregateMessages(ClassTag[A]):VertexRDD[A])有一个可选的tripletFields参数,它指出在EdgeContext中,哪些数据被访问(如源顶点特征而不是目的顶点特征)。tripletsFields可能的选项定义在TripletFields中。tripletFields参数用来通知GraphX仅仅只需要EdgeContext的一部分允许GraphX选择一个优化的连接策略。例如,如果我们想计算每个用户的追随者的平均年龄,我们仅仅只需要源字段。所以我们用TripletFields.Src表示我们仅仅只需要源字段。在下面的例子中,我们用aggregateMessages操作计算每个用户更年长的追随者的年龄。// Import random graph generation libraryimport org.apache.spark.graphx.util.GraphGenerators// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )// Compute the number of older followers and their total ageval olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( triplet => { // Map Function if (triplet.srcAttr > triplet.dstAttr) { // Send message to destination vertex containing counter and age triplet.sendToDst(1, triplet.srcAttr) } }, // Add counter and age (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function)// Divide total age by number of older followers to get average age of older followersval avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )// Display the resultsavgAgeOfOlderFollowers.collect.foreach(println(_))当消息(以及消息的总数)是常量大小(列表和连接替换为浮点数和添加)时,aggregateMessages操作的效果最好。Map Reduce三元组过渡指南在之前版本的GraphX中,利用[mapReduceTriplets]操作完成相邻聚合。class Graph[VD, ED] { def mapReduceTriplets[Msg]( map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)], reduce: (Msg, Msg) => Msg) : VertexRDD[Msg]}mapReduceTriplets操作在每个三元组上应用用户定义的map函数,然后保存用用户定义的reduce函数聚合的消息。然而,我们发现用户返回的迭代器是昂贵的,它抑制了我们添加额外优化(例如本地顶点的重新编号)的能力。aggregateMessages(ClassTag[A]):VertexRDD[A])暴露三元组字段和函数显示的发送消息到源和目的顶点。并且,我们删除了字节码检测转而需要用户指明三元组的哪些字段实际需要。下面的代码用到了mapReduceTripletsval graph: Graph[Int, Float] = ...def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = { Iterator((triplet.dstId, "Hi"))}def reduceFun(a: Int, b: Int): Int = a + bval result = graph.mapReduceTriplets[String](msgFun, reduceFun)下面的代码用到了aggregateMessagesval graph: Graph[Int, Float] = ...def msgFun(triplet: EdgeContext[Int, Float, String]) { triplet.sendToDst("Hi")}def reduceFun(a: Int, b: Int): Int = a + bval result = graph.aggregateMessages[String](msgFun, reduceFun)计算度信息最一般的聚合任务就是计算顶点的度,即每个顶点相邻边的数量。在有向图中,经常需要知道顶点的入度、出度以及总共的度。GraphOps类包含一个操作集合用来计算每个顶点的度。例如,下面的例子计算最大的入度、出度和总度。// Define a reduce operation to compute the highest degree vertexdef max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b}// Compute the max degreesval maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)Collecting

上一页  [1] [2] [3] [4]  下一页


Spark GraphX图操作符