SparkSpark专题大数据

spark graphx 图计算

2018-01-03  本文已影响0人  夜尽天明时

写在前面

什么是一个图

属性图

GRAPHX

graphx是一个图计算引擎,而不是一个图数据库,它可以处理像倒排索引,推荐系统,最短路径,群体检测等等

SPARK GRAPHX

def aggregateMessages[A: ClassTag](
         sendMsg: EdgeContext[VD, ED, A] => Unit,
         mergeMsg: (A, A) => A,
         tripletFields: TripletFields = TripletFields.All): VertexRDD[A] = {
  aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}

EdgeContext


EdgeContext

主要考虑


sendmsg
这两个方法
这两个方法一个吧triplets中数据发送到源节点

一个是把triplets中的数据发送到目的节点
这样就可以在源或者目的节点进行聚合操作了
看个例子:

graph.aggregateMessages[Int](_.sendToSrc(1), _ + _).foreach(println)

这个例子就是求出图的出度
sendToSrc(1)会针对每一个triplets向源节点发送1
如图


三元组

会向2节点发送一个1
_ + _ :表示针对每个节点做相加的聚合
比如下图5节点有4个triplets,采用sendToSrc方法后,它的聚合就是1+1 = 2
也就是它的出度



结果是
(4,1)
(3,1)
(5,3)
(2,1)
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
     (graph: Graph[VD, ED],
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)
     (vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
(1)graph:
输入的图
(2) initialMsg:
初始化消息,在第一次迭代的时候,这个初始消息会被用来初始化图中的每个节点,在pregel进行调用时,会首先在图上使用mapVertices来根据initialMsg的值更新每个节点的值,至于如何更新,则由vprog参数而定,vprog函数就接收了initialMsg消息做为参数来更新对应节点的值
(3) maxIterations:
最大迭代的次数
(4) activeDirection: 
活跃方向,首先理解活跃消息与活跃顶点,活跃节点是指在某一轮迭代中pregel以sendMsg和mergeMsg为参数来调用graph的aggregateMessage方法后收到消息的节点,活跃消息就是这轮迭代中所有被成功收到的消息。这样一来,有的边的src节点是活跃节点,有的dst节点是活跃节点,而有的边两端节点都是活跃节点。如果activeDirection参数指定为“EdgeDirection.Out”,则在下一轮迭代时,只有接收消息的出边(src—>dst)才会执行sendMsg函数,也就是说,sendMsg回调函数会过滤掉”dst—>src”的edgeTriplet上下文参数
EdgeDirection.Out —sendMsg gets called if srcId received a message during the previous iteration, meaning this edge is considered an “out-edge” of srcId.
EdgeDirection.In—sendMsg gets called if dstId received a message during the previous iteration, meaning this edge is considered an “in-edge” of dstId.
EdgeDirection.Either—sendMsg gets called if either srcId or dstId received a message during the previous iteration.
EdgeDirection.Both —sendMsg gets called if both srcId and dstId received mes- sages during the previous iteration.
(5) vprog:
节点变换函数,在初始时,在每轮迭代后,pregel会根据上一轮使用的msg和这里的vprod函数在图上调用joinVertices方法变化每个收到消息的节点,注意这个函数除初始时外,都是仅在接收到消息的节点上运行,这一点可以从源码中看到,源码中用的是joinVertices(message)(vprog),因此,没有收到消息的节点在join之后就滤掉了
(6) sendMsg: 
消息发送函数,该函数的运行参数是一个代表边的上下文,pregel在调用aggregateMessages时,会将EdgeContext转换成EdgeTriplet对象(ctx.toEdgeTriplet)来使用,用户需要通过Iterator[(VertexId,A)]指定发送哪些消息,发给那些节点,发送的内容是什么,因为在一条边上可以发送多个消息,有sendToDst和sendToSrc,所以这里是个Iterator,每一个元素是一个tuple,其中的vertexId表示要接收此消息的节点的id,它只能是该边上的srcId或dstId,而A就是要发送的内容,因此如果是需要由src发送一条消息A给dst,则有:Iterator((dstId,A)),如果什么消息也不发送,则可以返回一个空的Iterator:Iterator.empty
(7) mergeMsg: 
邻居节点收到多条消息时的合并逻辑,注意它区别于vprog函数,mergeMsg仅能合并消息内容,但合并后并不会更新到节点中去,而vprog函数可以根据收到的消息(就是mergeMsg产生的结果)更新节点属性

GraphX 在顶点 RDD 和边 RDD 的分区中以数组形式存储顶点数据和边数据,目的是为了不损失元素访问性能。同时,GraphX 在分区里建立了众多索引结构,高效地实现快速访问顶点数据或边数据。在迭代过程中,图的结构不会发生变化,因而顶点 RDD、边 RDD 以及重复顶点视图中的索引结构全部可以重用,当由一个图生成另一个图时,只须更新顶点 RDD 和边 RDD 的数据存储数组,因此,索引结构的重用保持了GraphX 高性能,也是相对于原生 RDD 实现图模型性能能够大幅提高的主要原因。

-分区方式简介


分区方式

算法

val sourceId: VertexId = 5L
    val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    val sssp = initialGraph.pregel(
      Double.PositiveInfinity,
      activeDirection = EdgeDirection.Out
    )(
      (vertexId, vertexValue, msg) =>
        math.min(vertexValue, msg),//vprog,作用是处理到达顶点的参数,取较小的那个作为顶点的值
      triplet => { //sendMsg,计算权重,如果邻居节点的属性加上边上的距离小于该节点的属性,说明从源节点比从邻居节点到该顶点的距离更小,更新值
          if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
            Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
          } else {
            Iterator.empty
          }
      },
      (a, b) => math.min(a, b) //mergeMsg,合并到达顶点的所有信息
    )
    println(sssp.vertices.collect.mkString("\n"))

以上代码是求节点ID为5的所有可到达节点的最短路径
算法详解:首先initialGraph就先遍历所有的节点吧我们设置的目标节点设置的属性值设置成0.0其他的所有节点设置成正无穷,pregel中的Double.PositiveInfinity是初始化参数,在pregel执行的过程中的第一次迭代时,会初始化所有的节点属性值,会根据下边的vprog = (vertexId, vertexValue, msg) => math.min(vertexValue, msg),//(vprog,作用是处理到达顶点的参数,取较小的那个作为顶点的值)去处理所有的节点,所以,初始化后除了5节点的属性值为0.0外,其他的都是正无穷。activeDirection = EdgeDirection.Out限定所有的有效方向是出边,triplet限定了只有在每次迭代中满足triplet.srcAttr + triplet.attr < triplet.dstAttr条件的才会更新当前节点值,最后(a, b) => math.min(a, b)方法合并了迭代到当前所有接受到消息的顶点的属性值,也就是说找到源顶点到可达顶点中的路径最小的那个可达顶点。不断的迭代下去,最后扫描完整个图,最终得出到所有可达顶点最短路径。

val friends = Pregel(
      graph.mapVertices((vid,value) => if(vid ==2) 2 else -1),//初始化信息,源节点为2,其他节点为-1
      -1,
      2,
      EdgeDirection.Either
    )(
      vprog = (vid,attr,msg) =>math.max(attr, msg),//顶点操作,到来的属性和原属性比较,较大的作为该节点的属性
      edge => {
        if (edge.srcAttr <= 0) {
          if (edge.dstAttr <= 0) {
            Iterator.empty//都小于0,说明从源节点还没有传递到这里
          }else {
            Iterator((edge.srcId,edge.dstAttr - 1))//目的节点大于0,将目的节点属性减一赋值给源节点
          }
        }else {
          if(edge.dstAttr <= 0) {
            Iterator((edge.dstId,edge.srcAttr -1))//源节点大于0,将源节点属性减一赋值给目的节点
          }else {
            Iterator.empty//都大于0,说明在二跳节点以内,不操作
          }
        }
      },
      (a,b) => math.max(a, b)//当有多个属性传递到一个节点,取大的,因为大的离源节点更近
    ).subgraph(vpred =(vid,v) =>v >= 0)
    friends.vertices.collect.foreach(println(_))

算法详解:首先,把目标节点的属性值置为2,初始化其他的所有的节点的属性值为-1,第一次迭代消息(-1)初始化就是根据vprog = (vid, attr, msg) => math.max(attr, msg)再过滤一遍节点,在剩下的迭代过程中,edge中的条件限定只扫描:
(1)如果源小于0,目标也小于0,则不发消息
(2)如果源小于0,目标大于0,则目标值-1赋给源节点
(3)如果源大于0,目标值也大于0,则不发消息
(4)如果源大于0,目标值小于0,则把源-1赋给目标节点
也就是说只会在有正负差距的的节点之间才会有消息传递


初始化图
条件遍历

算法

graph.pageRank(0.001,0.15)
      .vertices   //列出所有点
      .sortBy(_._2, false) //根据pagerank降序排序
      .take(20)  //取出前20个
      .foreach(println)

很简单,解释下参数:0.001是个容忍度,是在对下边公式进行迭代过程中退出迭代的条件,0.15也是默认的初始跳转概率,也就是公式中的resetProb


公式
       graph.personalizedPageRank(34175, 0.001) //某人是34175
      .vertices
      .filter(_._1 != 34175)
      .reduce((a,b) => if (a._2 > b._2) a else b)  //找出那个34175感兴趣的人
graph.triangleCount()
      .vertices
      .sortBy(_._2, false)
      .take(20)
      .foreach(println)

找出拥有三角形环关系的最多的顶点

ShortestPaths.run(diseaseSymptom,Array(19328L))
      .vertices
      .filter(!_._2.isEmpty)
      .foreach(println)

其中19328L是自定义的起始点

(266,Map(19328 -> 15))
(282,Map(19328 -> 12))
(770,Map(19328 -> 9))
(1730,Map(19328 -> 11))
(2170,Map(19328 -> 6))
(1530,Map(19328 -> 13))
(1346,Map(19328 -> 14))
(378,Map(19328 -> 3))
(1378,Map(19328 -> 11))
(970,Map(19328 -> 10))
...

结果如上,(266,Map(19328 -> 15))表示19328到266的最短路径为15

val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
      sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
        Edge(4L,5L,""), Edge(6L,7L,""))))
    g.connectedComponents
      .vertices
      .map(_.swap)
      .groupByKey()
      .map(_._2)
      .foreach(println)

输出结果:

CompactBuffer(6, 7)
CompactBuffer(4, 2, 3, 5)
CompactBuffer(1)
g.stronglyConnectedComponents(3)
      .vertices.map(_.swap)
      .groupByKey()
      .map(_._2)
      .foreach(println)

其中3是最大迭代次数,在上边图中,迭代三次刚好,也可以设置的大一点,不过结果都是一样的

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
    //初始化起始节点的属性值
    var g2 = g.mapVertices(
      (vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue))
    for (i <- 1L to g.vertices.count-1) {
      val currentVertexId =
        g2.vertices.filter(!_._2._1)
          .fold((0L,(false,Double.MaxValue)))((a,b) =>
            if (a._2._2 < b._2._2) a else b)
          ._1
      val newDistances = g2.aggregateMessages[Double](
        ctx => if (ctx.srcId == currentVertexId)
          ctx.sendToDst(ctx.srcAttr._2 + ctx.attr),
        (a,b) => math.min(a,b))
      g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) =>
        (vd._1 || vid == currentVertexId,
          math.min(vd._2, newSum.getOrElse(Double.MaxValue))))
    }
    g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
      (vd, dist.getOrElse((false,Double.MaxValue))._2))
  }

待续-------

上一篇 下一篇

猜你喜欢

热点阅读