Flink-Gelly:Iterative Graph Proc
Gelly利用Flink的高效迭代算子来支持海量数据的迭代式图处理。目前,Flink Gelly提供了“Vertex-Centric”,“Scatter-Gather”以及“Gather-Sum-Apply”等计算模型的实现。下面将展示这些计算模型的思想和使用场景。
Vertex-Centric Iterations
“Vertex-Centric”迭代模型也就是我们经常听到的“Pregel”,是一种从Vertex角度出发的图计算方式。其中,同步地迭代计算的步骤称之为“superstep”。在每个“superstep”中,每个顶点都执行一个用户自定义的函数,且顶点之间通过消息进行通信,当一个顶点知道图中其他任意顶点的唯一ID时,该顶点就可以向其发送一条消息。
该计算模型如下图所示。虚线框对应了一系列并行的计算单元(即用户自定义的计算函数)。在每个“superstep”中,所有的活跃的顶点并行地执行同一个用户自定义的计算函数。所有的“superstep”之间同步地被执行(step by step),因此可以保证一个“superstep”发送的消息会在下一个“superstep”开始的时候被接收到。
在Gelly中,用户只需要定义顶点的计算函数(Compute Function)就可以使用“Vertex-Centric”迭代模型。把这个计算函数和最大迭代次数传给Gelly的runVertexCentricIteration方法,该方法会在输入的图上执行“Vertex-Centric”的迭代计算,然后返回一个顶点值被修改的新图。另外,可以选择定义一个可选的消息组合器MessageCombiner以降低通信成本。
下面可以看一个用vertex-centric模式实现计算单源点最短路径的例子。最开始,每一个顶点都带有一个表示距离的属性,除了源点外该属性值为0外,其他的顶点该属性值均为无穷大。在第一步中,源点沿着边向邻居顶点传播它的距离,在接下来的superstep中每个顶点检查其接收的信息,然后从中选择一个最小距离,如果该距离值小于当前顶点上的距离属性值,则将该属性值进行修改,并将该值传递给邻居顶点,否则什么都不做。该算法在所有顶点值都不变或者达到指定迭代次数时收敛。在该算法中,Message Combiner可以用来发送到目标节点的消息数量。
代码示例如下:
// read the input graph
Graph<Long, Double, Double> graph = ...
// define the maximum number of iterations
int maxIterations = 10;
// Execute the vertex-centric iteration
Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
new SSSPComputeFunction(), new SSSPCombiner(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
// - - - UDFs - - - //
public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) {
double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY;
for (Double msg : messages) {
minDistance = Math.min(minDistance, msg);
}
if (minDistance < vertex.getValue()) {
setNewVertexValue(minDistance);
for (Edge<Long, Double> e: getEdges()) {
sendMessageTo(e.getTarget(), minDistance + e.getValue());
}
}
}
// message combiner
public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
public void combineMessages(MessageIterator<Double> messages) {
double minMessage = Double.POSITIVE_INFINITY;
for (Double msg: messages) {
minMessage = Math.min(minMessage, msg);
}
sendCombinedMessage(minMessage);
}
}
Configuring a Vertex-Centric Iteration
可以使用VertexCentricConfiguration对Vertex-Centric Iteration进行配置,目前可以指定如下参数:
- Name:Vertex-Centric Iteration的名称,显示在日志和消息中,可以通过使用
setName()
方法来指定。 - Parallelism:Iteration的并发度,可以使用setParallelism()方法来设定。
- Solution set in unmanaged memor:定义解集合是否保存在托管内存内(Flink 内部以序列化的方式保存对象),默认情况下,解集合运行在托管内存中。该属性可以使用方法
setSolutionSetUnmanagedMemory()
来设置。 - Aggregators:迭代的聚合器可以使用方法
registerAggregator()
来注册,聚合器可以在每个superstep中全局地聚合一次所有的变量,并使其对接下来的superstep可用。注册的Aggregators可以在用户自定义的计算函数ComputeFunction内部访问。 - Broadcast Variables:可以使用
addBroadcastSet()
将数据集作为Broadcast Variables传入到ComputeFunction。
示例如下:
Graph<Long, Double, Double> graph = ...
// configure the iteration
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
// set the iteration name
parameters.setName("Gelly Iteration");
// set the parallelism
parameters.setParallelism(16);
// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());
// run the vertex-centric iteration, also passing the configuration parameters
Graph<Long, Long, Double> result =
graph.runVertexCentricIteration(
new Compute(), null, maxIterations, parameters);
// user-defined function
public static final class Compute extends ComputeFunction {
LongSumAggregator aggregator = new LongSumAggregator();
public void preSuperstep() {
// retrieve the Aggregator
aggregator = getIterationAggregator("sumAggregator");
}
public void compute(Vertex<Long, Long> vertex, MessageIterator inMessages) {
//do some computation
Long partialValue = ...
// aggregate the partial value
aggregator.aggregate(partialValue);
// update the vertex value
setNewVertexValue(...);
}
}
Scatter-Gather Iterations
scatter-gather模型,也被称之为“signal/collect”模型,是另一种从Vertex角度出发的图计算方式。该计算以同步迭代的方式进行,每个迭代的计算都称之为一个superstep。在每个superstep中,每个顶点向其他顶点传播信息,并根据接收到的信息修改当前顶点的值,其中传播信息的过程称之为scatter,也称之为signal,接收信息并修改顶点值的过程称之为gather,也叫collect。在Flink Gelly中使用scatter-gather模型,用户只需定义每个superstep中顶点的以下两种操作:
- Scatter:产生需要传递给其他顶点的信息。
- Gather:根据接收其他顶点的信息,更新当前顶点的值。
Gelly提供了使用scatter-gather的方法,使用者只需要对应实现scatter和gather的方法即可。其中ScatterFunction允许一个顶点向其他顶点发送消息。在同一个superstep中,发送出去的消息会立即被对应的顶点接收到。另外一个方法是GatherFunction,该方法定义了一个顶点在接收消息之后如何更新当前顶点的值。这两个方法和最大迭代次数会作为参数传递给Gelly的runScatterGatherIteration。该方法会在输入的图上执行scatter-gather迭代,并返回一个顶点值被修改了的新图。
当然,我们可以使用一些信息来扩展scatter-gather迭代,比如总节点数,出度和入度等,另外还可以指定每个顶点的邻接节点的类型,包括(入/出/所有)类型的邻接节点。默认情况下,一个节点只接收(入-邻接)节点的消息,并向(出-邻接)节点发送消息。
下面展示了使用scatter-gather迭代解决单源点最短路径问题的大致过程,一次superstep的详细内容。这里顶点1作为源点,在每个superstep中,每个节点会向(出-)邻接节点发送一条候选距离的消息,该消息值为当前顶点的值加上与(出)邻接节点相连的边的权重值。然后每个顶点在基于接收的候选距离消息的基础上更新当前顶点的值,即若接收到的消息中的最小值小于当前顶点的值,则将改顶点的值改成这个最小值,否则什么都不做。在接下来的迭代中,如果一个节点经过一次superstep当前顶点的值没有发生修改,则不向邻接节点发送消息。当所有顶点的值不会再变或者达到指定的迭代次数时,该过程收敛。
代码示例如下:
// read the input graph
Graph<Long, Double, Double> graph = ...
// define the maximum number of iterations
int maxIterations = 10;
// Execute the scatter-gather iteration
Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
// - - - UDFs - - - //
// scatter: messaging
public static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> {
public void sendMessages(Vertex<Long, Double> vertex) {
for (Edge<Long, Double> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
}
}
}
// gather: vertex update
public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
Double minDistance = Double.MAX_VALUE;
for (double msg : inMessages) {
if (msg < minDistance) {
minDistance = msg;
}
}
if (vertex.getValue() > minDistance) {
setNewVertexValue(minDistance);
}
}
}
Configuring a Scatter-Gather Iteration
我们可以使用ScatterGatherConfiguration对象来配置一个Scatter-Gather迭代过程。目前可以指定如下参数:
- Name:Vertex-Centric Iteration的名称,显示在日志和消息中,可以通过使用
setName()
方法来指定。 - Parallelism:Iteration的并发度,可以使用setParallelism()方法来设定。
- Solution set in unmanaged memor:定义解集合是否保存在托管内存内(Flink 内部以序列化的方式保存对象),默认情况下,解集合运行在托管内存中。该属性可以使用方法
setSolutionSetUnmanagedMemory()
来设置。 - Aggregators:迭代的聚合器可以使用方法
registerAggregator()
来注册,聚合器可以在每个superstep中全局地聚合一次所有的变量,并使其对接下来的superstep可用。注册的Aggregators可以在用户自定义的计算函数ComputeFunction内部访问。 - Broadcast Variables:可以使用
addBroadcastSet()
将数据集作为Broadcast Variables传入到ComputeFunction。 - Number of Vertices:在每个迭代过程中访问图中所有的顶点数。可以通过方法
setOptNumVertices()
来设定。该值可以通过使用方法getNumberOfVertices()
来取值,如果该项未设置,该方法默认返回-1。 - Degrees:在迭代过程中访问节点的出度/入度。该属性值可以通过方法
setOptDegrees()
来设置,并通过getInDegree()
和getOutDegree()
方法来获取对应的入度和出度。如果未设置,这两个方法默认返回-1. - Messaging Direction:默认情况下,消息会发送给每个顶点的出-邻接节点,并根据入-邻接节点的消息修改当前节点的值。该配置允许用户根据自己的意愿设置消息的传播方向,可选方向有:
EdgeDirection.IN, EdgeDirection.OUT, EdgeDirection.ALL
,通过setDirection()
方法设置方向后,消息就会沿着指定的方向EdgeDirection.IN, EdgeDirection.OUT, EdgeDirection.ALL
进行传递。
部分设置参数的示例如下:
Graph<Long, Double, Double> graph = ...
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
// set the iteration name
parameters.setName("Gelly Iteration");
// set the parallelism
parameters.setParallelism(16);
// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());
// run the scatter-gather iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
graph.runScatterGatherIteration(
new Messenger(), new VertexUpdater(), maxIterations, parameters);
// user-defined functions
public static final class Messenger extends ScatterFunction {...}
public static final class VertexUpdater extends GatherFunction {
LongSumAggregator aggregator = new LongSumAggregator();
public void preSuperstep() {
// retrieve the Aggregator
aggregator = getIterationAggregator("sumAggregator");
}
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator inMessages) {
//do some computation
Long partialValue = ...
// aggregate the partial value
aggregator.aggregate(partialValue);
// update the vertex value
setNewVertexValue(...);
}
}
//////////////////////////////////////////////////////////////////////////////
//The following example illustrates the usage of the degree as well as the number of vertices options.
//////////////////////////////////////////////////////////////////////////////
Graph<Long, Double, Double> graph = ...
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
// set the number of vertices option to true
parameters.setOptNumVertices(true);
// set the degree option to true
parameters.setOptDegrees(true);
// run the scatter-gather iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
graph.runScatterGatherIteration(
new Messenger(), new VertexUpdater(), maxIterations, parameters);
// user-defined functions
public static final class Messenger extends ScatterFunction {
...
// retrieve the vertex out-degree
outDegree = getOutDegree();
...
}
public static final class VertexUpdater extends GatherFunction {
...
// get the number of vertices
long numVertices = getNumberOfVertices();
...
}
//////////////////////////////////////////////////////////////////////////////
//The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors.
Graph<Long, HashSet<Long>, Double> graph = ...
//////////////////////////////////////////////////////////////////////////////
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
// set the messaging direction
parameters.setDirection(EdgeDirection.IN);
// run the scatter-gather iteration, also passing the configuration parameters
DataSet<Vertex<Long, HashSet<Long>>> result =
graph.runScatterGatherIteration(
new Messenger(), new VertexUpdater(), maxIterations, parameters)
.getVertices();
// user-defined functions
public static final class Messenger extends GatherFunction {...}
public static final class VertexUpdater extends ScatterFunction {...}
Gather-Sum-Apply Iterations
与Scatter-Gather模型相似,Gather-Sum-Apply模型(以下简称GSA)的计算也以同步迭代的方式进行,每个迭代的计算都称之为一个superstep。GSA的每个superstep由以下三个阶段组成:
- Gather:在一个顶点的每条边和顶点的邻接节点上并行调用的一个用户定义的函数,生成中间值。
- Sum:将Gather阶段生成的中间值按照用户定义的方式聚合生成一个单独的新值。
-
Apply:使用一个定义的函数根据当前值和Sum阶段聚合生成的值对每个节点的值进行更新。
同样的,这里也以单源点最短路径为例。如下图所示,假设顶点1为源节点。在Gather阶段,我们通过将当前顶点的值与邻接的边上的权重求和,为每当前顶点的每个邻接顶点计算一个候选距离值;然后在Sum阶段,根据顶点的ID对候选的距离值进行group,并为每个顶点选择一个最小的距离值;最后再Apply阶段,将Sum阶段为每个顶点选择出的最小距离值与该顶点当前的值进行比较,如果Sum阶段选择出的最小值小于当前顶点的值,则将顶点的当前值替换成最小值。
image.png
与Scatter-Gather模型相似,当一次迭代中,一个顶点的值未被修改,那么在下一个迭代中,该顶点不再计算候选距离。当没有节点的值变化时,该算法收敛。
在Gelly中,为了使用GSA,我们需要调用runGatherSumApplyIteration方法,并提供三个用户定义的方法:GatherFunction, SumFunction 和 ApplyFunction。迭代同步、分组、值更新和收敛交由系统处理。示例代码如下:
// read the input graph
Graph<Long, Double, Double> graph = ...
// define the maximum number of iterations
int maxIterations = 10;
// Execute the GSA iteration
Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
// - - - UDFs - - - //
// Gather
private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
public Double gather(Neighbor<Double, Double> neighbor) {
return neighbor.getNeighborValue() + neighbor.getEdgeValue();
}
}
// Sum
private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
public Double sum(Double newValue, Double currentValue) {
return Math.min(newValue, currentValue);
}
}
// Apply
private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
public void apply(Double newDistance, Double oldDistance) {
if (newDistance < oldDistance) {
setResult(newDistance);
}
}
}
Configuring a Gather-Sum-Apply Iteration
我们可以使用ScatterGatherConfiguration对象来配置一个GSA迭代过程。目前可以指定如下参数:
- Name:Vertex-Centric Iteration的名称,显示在日志和消息中,可以通过使用
setName()
方法来指定。 - Parallelism:Iteration的并发度,可以使用setParallelism()方法来设定。
- Solution set in unmanaged memor:定义解集合是否保存在托管内存内(Flink 内部以序列化的方式保存对象),默认情况下,解集合运行在托管内存中。该属性可以使用方法
setSolutionSetUnmanagedMemory()
来设置。 - Aggregators:迭代的聚合器可以使用方法
registerAggregator()
来注册,聚合器可以在每个superstep中全局地聚合一次所有的变量,并使其对接下来的superstep可用。注册的Aggregators可以在用户自定义的计算函数ComputeFunction内部访问。 - Broadcast Variables:可以使用
addBroadcastSet()
将数据集作为Broadcast Variables传入到ComputeFunction。 - Number of Vertices:在每个迭代过程中访问图中所有的顶点数。可以通过方法
setOptNumVertices()
来设定。该值可以通过使用方法getNumberOfVertices()
来取值,如果该项未设置,该方法默认返回-1。 - Neighbor Direction:与Scatter/Gather中的Message Direction类似,默认情况下只向外传播,可以通过方法
setDirection()
进行修改,可选方向有:EdgeDirection.IN, EdgeDirection.OUT, EdgeDirection.ALL
。示例代码如下:
Graph<Long, Double, Double> graph = ...
// configure the iteration
GSAConfiguration parameters = new GSAConfiguration();
// set the number of vertices option to true
parameters.setOptNumVertices(true);
// run the gather-sum-apply iteration, also passing the configuration parameters
Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(
new Gather(), new Sum(), new Apply(),
maxIterations, parameters);
// user-defined functions
public static final class Gather {
...
// get the number of vertices
long numVertices = getNumberOfVertices();
...
}
public static final class Sum {
...
// get the number of vertices
long numVertices = getNumberOfVertices();
...
}
public static final class Apply {
...
// get the number of vertices
long numVertices = getNumberOfVertices();
...
}
//////////////////////////////////////////////////////////////////////
//The following example illustrates the usage of the edge direction option.
//////////////////////////////////////////////////////////////////////
Graph<Long, HashSet<Long>, Double> graph = ...
// configure the iteration
GSAConfiguration parameters = new GSAConfiguration();
// set the messaging direction
parameters.setDirection(EdgeDirection.IN);
// run the gather-sum-apply iteration, also passing the configuration parameters
DataSet<Vertex<Long, HashSet<Long>>> result =
graph.runGatherSumApplyIteration(
new Gather(), new Sum(), new Apply(), maxIterations, parameters)
.getVertices();
Iteration Abstractions Comparison
尽管Gelly中的三个迭代模型抽象起来看着非常的相似,但是理解它们之间的差异可以帮助我们提高程序的性能和可维护性。在这三种模型中,vertex-centric模型是最通用的模型,支持对每个顶点进行任意的计算和消息传递。scatter-gather模型将生成消息的逻辑与更新顶点值的逻辑解耦,因此scatter-gather模型相比较而言更易于迭代和维护,另外这两个模块的解耦还能对性能产生积极的影响。scatter-gather模型不需要并发地对消息接收的数据和发送的数据进行处理,因此通常有着较低的内存要求。然而,这种特性也限制了表达性,使一些计算模式表现得不那么直观。当然,如果一个算法需要一个顶点并发地访问它的接收的信息数据和发出的信息数据,那么用scatter-gather这种表达方式可能会有问题,例如强连通组件分析。
GSA模型与scatter-gather也非常的相似,事实上,任何一个可以用GSA模型解决的计算问题都可以使用scatter-gather模型来解决。其中Apply阶段仅仅用来更新当前顶点的值。两种实现的主要区别在于GSA的Gather阶段在边上进行并行计算,而scatter-gather的消息传递阶段在顶点上进行并行计算。另外一个区别就是在实现机制上,scatter-gather在内部实现中用到了一个coGroup操作,而GSA使用的时reduce操作。因此,如果组合邻居值(消息)的函数需要计算整个值组,则应该使用scatter-gather。如果更新方法是互相关联和交互的,那么GSA的有望提供更有效的实现,因为它可以使用组合器。
另外需要注意的是,GSA严格的工作在顶点的邻接顶点上,而 在vertex-centric和scatter-gather模型,一个顶点可以通过顶点ID向任何一个顶点发送消息,不管该顶点与当前顶点是否邻接。
三种迭代模型的主要不同如下表所示:
image.png
注:最近项目中图计算的部分需求需要用到Flink Gelly,因此看了以下官网的文档,时间仓促,不足之处还请指正!!!