Spark 应用spark

Spark图计算API简介

2017-11-10  本文已影响83人  过雨神

一、属性操作符:

graph中提供了对vertex,edge和triplet的map操作,类似于RDD中的map操作:

def mapVertices[VD2](map:(VertexId, VD)=> VD2): Graph[VD2, ED]

def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

使用这些方法不会改变图的结构,所以这些操作符可以利用原有的图的structural indicies。所以不要用graph.vertices.map的方法来实现同样的操作。

mapEdges: transform each edge attribute in the graph using the map function.

实例:注意在mapEdges中使用的函数里,输入参数x是一个Edge对象,返回对象则是Edge的属性对象。在例子中,属性对象的类型并没有改变,(都是String)但属性的值有所变化。也可以变成其它的类型的对象。

val sheyouGraph = graph.mapEdges(x => {if("roommate".equals(x.attr)) "sheyou" else x.attr})

mapVertices: transform each vertex attribute in the graph using the map function

跟mapEdges类似,mapVerticies中传入的对象也是Vertex的实例化对象,返回值也是顶点的属性对象:

val oneAttrGraph = graph.mapVertices((id, attr) => {attr._1+ " is:"+attr._2})

mapTriplets: Transforms each edge attribute using the map function, passing it the adjacent(临近的) vertex attributes as well.

也就是在mapTriplets中,与mapEdges不同的地方仅仅在于可以使用的作为map条件的东西多了邻近的顶点的属性,最终改变的东西仍然是edge的属性。如果转换中不需要根据顶点的属性,就直接用mapEdges就行了。

什么是Triplet:

Triplet的全称是EdgeTriplet,继承自Edge,所代表的entity是:An edge along with the vertex attributes of its neighboring vertices. 一个EdgeTriplet中包含srcId, dstId, attr(继承自Edge)和srcAttr和dstAttr五个属性。

graph.mapTriplets(triplet => {.....})

二、Structural Operators:

1. subgraph:

方法的定义:

def subgraph(

    epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

    vpred: (VertexId, VD) => Boolean = ((v, d) => true)

): Graph[VD, ED]

返回的对象是一个图,图中包含着的顶点和边分别要满足vpred和epred两个函数。(要注意,顶点和边是完全不同的概念,如果一个边被砍掉了,这个边关联的两个顶点并不会受影响)

要注意,在图里,如果一个顶点没了,其对应的边也就没了,但边没了之后,点不会受影响。

所以,subgraph一般用于:restrict the graph to the vertices and edges of interest或者eliminate broken links.

2. joinVertices/outerJoinVerticies:

有时候需要从外部的RDD中跟Graph做数据的连接操作。例如:外部的user属性想要跟现有的graph做一个合并,或者想把图的顶点的属性从一个图迁移到另一个图中。这些可以用join来完成。

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

joinVertices: 将顶点跟输入的RDD[(VertexId, U)]做关联,返回一个新的图。新的图的属性的类型跟原图是一样的,但值可以改变;在mapFunc中,可以使用原来的图的顶点属性和输入的RDD的顶点属性U来计算新的顶点属性。输入的RDD中每个vertex最多只能有一个vertex。如果原图在input table中没有对应的entry,则原来的属性不做改变。

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

从函数的定义可以看出,该操作不会改变vertex的属性的类型,但值是可以改变的。比如first name需要加上last name。

事实上,joinVerticies方法的实现中就使用了outerJoinVerticies方法:

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = {

    val uf = (id: VertexId, data: VD, o: Option[U]) => {

        o match {

            case Some(u) => mapFunc(id, data, u) case None => data

        }

    }

    graph.outerJoinVertices(table)(uf)

}

outerJoinVertices与joinVertices很类似,但在map方法中可以修改vertex的属性类型。由于并非所有的图的顶点都一定能跟传入的RDD匹配上,所以定义mapFunc的时候使用了option选项。对于joinVerticies方法,如果某个顶点没有跟传入的RDD匹配上,就直接用原有的值。因为joinVerticies并不改变顶点的数据类型(有没有忘了option跟Some、None之间的爱恨情仇?使用Option的时候一定离不开match,要注意match的语法)。

val outDegGraph = graph.outDegrees

val degGraph = graph.outerJoinVertices(outDegGraph){    

    (id, oldAttr, outDeg) => {    

        outDeg match{

            case Some(outDeg) => outDegcase None => 0

        }

    }

}

3. aggregateMessages(原来的名字叫做mapReduceTriplets):

如果需要将顶点跟其邻居的信息集成起来,可以使用aggregateMessages方法。比如,想要知道有多少人follow了一个用户,或者follow用户的平均年龄。

函数的定义:

def aggregateMessages[A: ClassTag](

    sendMsg: EdgeContext[VD, ED, A] => Unit,

    mergeMessage: (A, A) => A,

    tripletFields: TripletFields = TripletFields.ALL

): VertexRDD[A]

跟mapReduceTriplets的定义很类似:

def mapReduceTriplets[Msg](

    map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],

    reduce: (Msg, Msg) => Msg

):VertexRDD[Msg]

VertexRDD继承了RDD[(VertexId, VD)],本身自带的泛型VD。

API:aggregate values from the neighboring edges and vertices of each vertex.

方法中的sendMsg是在图中的每个边都会被调用的,用于将message发送给相邻顶点。

mergeMsg用于将sendMsg中发送给同一个顶点的message做组合。

tripletFields: 那些fields可以用于EdgeContext中,可用的值包括TripletFields.None, TripletFields.EdgeOnly, TripletFields.Src, TripletFields.Dst, TripletFields.ALL。默认为ALL,也就是所有的信息都要用,如果只需要用部分数据,可以单独选择部分属性发送,可以提升计算效率。

其中,Src和Dst分别会将source和destination field进行传递,而且都会添加edge fields:

如果TripletFields中传入的资源少了,也就是在sendMsg中需要使用到的信息并没有包含在TripletFields中,可能会报空指针异常。

使用实例:计算graph的出度或入度:

val inDeg: RDD[(VertexId, Int)] = graph.aggregateMessages[Int](edgeContext => edgeContext.sendToDst(1), _+_)

由于这里我们并没有用到edgeContext中的任何属性,所以其实也可以在参数中添加TripletFields.None,从而提高一点执行效率:

graph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _+_ , TripletFields.None)

TIPS: 什么是EdgeContext:EdgeContext中会将source和destination属性以及Edge属性都暴露出来,包含sendToSrc和sendToDst来将信息发送给source和destination属性。

4. reverse: return a new graph with all edge directions reversed.

调用方法:

val reverseGraph = graph.reverse

5. mask

mask用于创建一个子图,子图中包含在输入的图中也包含的顶点和边。该方法通常跟subgraph方法一起使用,来根据另一个关联的图来过滤当前的图中展示的数据。

6. groupEdges:

groupEdges用于将多重图中的相同的顶点之间的边做合并(除了属性其实没其他可以合并的)。

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

7. 一组collect:

graph中有多个collect算法,包括collectEdges, collectNeighbours, collectNeighbourIds等

collectEdges: return an RDD that contains for each vertex its local edges

8. Pregel API:

图本身就是内在的递归的数据结构,因为一个顶点的属性可能依赖于其neighbor,而neighbor的属性又依赖于他们的neighbour。所以很多重要的图算法都会迭代计算每个顶点的属性,直到达到一个稳定状态。

GraphX中的Pregel操作符是一个批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于图的拓扑结构(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作为edge triplet的一个函数并行计算的,message的计算可以使用source和dest顶点的属性。没有收到message的顶点在super step中被跳过。迭代会在么有剩余的信息之后停止,并返回最终的图。

pregel的定义:

def pregel[A]

    (initialMsg: A,//在第一次迭代中每个顶点获取的起始

    msgmaxIter: Int = Int.MaxValue,//迭代计算的次数

    activeDir: EdgeDirection = EdgeDirection.Out

)(

    vprog: (VertexId, VD, A) => VD,//顶点的计算函数,在每个顶点运行,根据顶点的ID,属性和获取的inbound message来计算顶点的新属性值。顶一次迭代的时候,inbound message为initialMsg,且每个顶点都会执行一遍该函数。以后只有上次迭代中接收到信息的顶点会执行。

    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//应用于顶点的出边(out edges)用于接收顶点发出的信息

    mergeMsg: (A, A) => A//合并信息的算法

)

算法实现的大致过程:

var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根据initMsg在每个顶点执行一次vprog算法,从而每个顶点的属性都会迭代一次。

var messages = g.mapReduceTriplets(sendMsg, mergeMsg)

var messagesCount = messages.count

var i = 0

while(activeMessages > 0 && i < maxIterations){

    g = g.joinVertices(messages)(vprog).cache

    val oldMessages = messages

    messages = g.mapReduceTriplets(

        sendMsg,

        mergeMsg,

        Some((oldMessages, activeDirection))

    ).cache()

    activeMessages = messages.count

    i += 1

}

g

pregel算法的一个实例:将图跟一些一些初始的score做关联,然后将顶点分数根据出度大小向外发散,并自己保留一份:

//将图中顶点添加上该顶点的出度属性

val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){

    case (vid, name, deg) => (name, deg match {

        case Some(deg) => deg+0.0

        case None => 1.25}

    )

}//将图与初始分数做关联

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

    case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//将图与初始分数做关联

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

    case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))

算法的第一步:将0.0(也就是传入的初始值initMsg)跟各个顶点的值相加(还是原来的值),然后除以顶点的出度。这一步很重要,不能忽略。 并且在设计的时候也要考虑结果会不会被这一步所影响。

9. 计算图的度、入度和出度:

graph.degrees

graph.outDegrees

graph.inDegrees

返回的对象是VertexRDD[Int]

注意的是返回的RDD对象中,度为0的顶点并不包含在内。

10. filter方法:先计算一些用于过滤的值(preprocess),然后在使用predicate进行过滤。

def filter[VD2: ClassTag, ED2: ClassTag](

    preprocess: Graph[VD, ED] => Graph[VD2, ED2],

    epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,

    vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true

):Graph[VD, ED] = {

    graph.mask(

        preprocess(graph).subgraph(epred, vpred)

    )

}

preprocess:a function to compute new vertex and edge data before filtering

要注意最后返回的图跟传入的图的顶点和边的属性类型是一样的。

该方法可以用于在不改变顶点和边的属性值(要注意的是,在preprocess中,使用graph的时候可能会有类似于修改graph操作的api调用,但在调用的过程中,graph本身的值不会发生改变。比如在下边的例子的中,graph做了一个跟其degree关联的操作,但graph本身的值没有任何变化)的情况下对图进行基于某些属性的过滤。这些属性的值可以是计算得来的。例如,删除图中没有出度的顶点:

graph.filter(

    graph => {

        val degrees: VertexRDD[Int] = graph.outDegrees

        graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}

    },

    vpred = (vid: VertexId, deg:Int) => deg > 0

)

11. groupEdges:合并两个顶点中的多条边称为一条边。要获取正确的结果,graph必须调用partitionBy来做partition。这是因为该操作假定需要一起合并的边都分布在同一个partition上。所以在调用groupEdges之前必须调用partitionBy。

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED],也就是对边的属性做合并操作。

partitionBy: repartitions the edges in the graph according to partitionStrategy.

def partitionBy(partitionStrtegy: PartitionStrategy): Graph[VD, ED]

12. convertToCanonicalEdges:将双向变转化为单向边。

具体的算法是:将所有边都转化成srcId小于dstId的边,然后合并多余的边。

二、Graph Builders:

http://spark.apache.org/docs/latest/graphx-programming-guide.html#graph_builders

GraphX提供了一组使用vertex和edge的集合来构建一个图的方法。这些Graph Builder默认不会对边做repartition,边一般留在其原来的默认的partition中,例如其原来的HDFS的block。

1. GraphLoader.edgeListFile:

用于从一组edge(每个edge中包括简单的source id和destination id)中来构建一个graph,自动创建其中涉及的顶点,顶点和边的属性都设置为1。

def edgeListFile(

    sc: SparkContext,

    path: String,

    canonicalOrientation: Boolean = false,

    minEdgePartitions: Int = 1,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[Int, Int]

canonicalOrientation参数可以强制让边按照srcId

2. Graph.apply:使用顶点和边的RDD对象来创建一个图。重复的顶点被任意抛弃,edgeRDD中有而verticiesRDD中没有的顶点会被赋予一个默认的属性值。

def apply[VD, ED](

    vertices: RDD[(VertexId, VD)],

    edges: RDD[Edge[ED]],

    defaultVertexAttr: VD = null,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, ED]

3. Graph.fromEdges: 从单独的一个边的RDD中构建一个图。自动创建边中使用的顶点,并赋予默认值。

def fromEdges[VD, ED](

    edges: RDD[Edge[ED]],

    defaultValue: VD,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, ED]

4. Graph.fromEdgeTuples: 使用一个edge tuple的RDD创建图。边的值默认为1,顶点自动创建并赋予默认值。该方法也支持对边的deduplication(也就是去重)。如果发现多个相同的边,就将他们合并,属性值计算他们的和。或者将重复的边当做多条边。

def fromEdgeTuples[VD](

    rawEdges: RDD[(VertexId, VertexId)],

    defaultValue: VD,

    uniqueEdges: Option[PartitionStrategy] = None,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, Int]

上一篇下一篇

猜你喜欢

热点阅读