深入了解Flink Gelly图计算模型之Gather-Sum-
Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,在之前的文章深入了解Flink Gelly图计算之—Vertex-Centric模型和深入了解Flink Gelly图计算模型之Vertex-Centric中我们详细的介绍了Vertex-Centric模型和Scatter-Gather模型的实现原理和使用方法,接下来我们将详细的介绍一下Gather-Sum-Apply模型的原理和实现过程,并以简单的示例来展示Gather-Sum-Apply模型的使用方法。
Gather-Sum-Apply模型概述
Gather-Sum-Apply模型的计算过程也是基于同步迭代,每个迭代的轮次称之为Superstep。每个Superstep由以下三个阶段组成:
Gather: 在每个轮次的迭代中,每个顶点并行的在其邻接顶点点和边上执行udf,并产生一个中间结果。
Sum: 每个顶点将其在Gather阶段产生的一系列中间结果应用udf进行聚合,生成一个单独的值。
Apply: 每个顶点根据Sum阶段产生的新结果只和当前顶点的属性值应用udf对当前顶点进行更新操作。
我们同样以单源点最短路径为例,简单介绍GSA每个阶段的具体执行逻辑。假设我们存在如下一个有向图,我们需要计算顶点1到其余三个顶点2,3,4的最短路径。首先我们需要对每个顶点的值进行初始化,其中顶点1的值(值记在顶点上的方框里)初始化为0,其余顶点的值默认初始化为正无穷。接下来以下图为例,详细介绍三个阶段的主要功能,在第一轮迭代中:
(1)Gather:顶点1没有邻居节点(考虑是有向图,没有边指向顶点1),因此不用计算中间距离值。顶点2的邻居为1,中间值结果为:;顶点3的邻居为1,中间值结果为:;顶点4的邻居为1,2,3,中间值结果为:。
(2)Sum:上个阶段执行完成后,顶点2的中间值结果为:;顶点3的中间值结果为:;顶点4的中间值结果为:。该阶段执行聚合操作,挑选出最小的中间值为新值,因此,聚合的结果为:。
(3)Apply:根据Sum聚合的结果,将每个顶点的当前值按照取最小进行更新,则该轮Apply之后,每个顶点的值为。
第一轮迭代完成后,在第二轮迭代中,顶点4的Gather阶段中间值为变成,Sum阶段的聚合结果为3,最终的Apply阶段后,每个顶点的值为。然后在下一轮迭代中,没有任何顶点回在Apply阶段更新当前值,因此整个GSA模型迭代结束,最终结果为。
Gather-Sum-Apply模型使用
上面我们大致介绍了GSA模型三个阶段的基本执行过程。GSA模型在实际使用的时候也非常简单,只需要用户根据实际需求实现三个阶段的UDF即可,即GatherFunction
,SumFunction
和ApplyFunction
。下面我们给出了一段示例代码,简单展示了#Gather-Sum-Apply模型的使用方法。
package com.quan.graph;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
import java.util.LinkedList;
import java.util.List;
public class GSA_SSSP {
// Set 1 as the source.
public static Integer srcId = 1;
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<Edge<Integer, Integer>> edgesList = new LinkedList<>();
edgesList.add(new Edge<Integer, Integer>(1, 2, 12));
edgesList.add(new Edge<Integer, Integer>(1, 6, 3));
edgesList.add(new Edge<Integer, Integer>(1, 7, 14));
edgesList.add(new Edge<Integer, Integer>(2, 6, 7));
edgesList.add(new Edge<Integer, Integer>(2, 3, 10));
edgesList.add(new Edge<Integer, Integer>(3, 4, 3));
edgesList.add(new Edge<Integer, Integer>(3, 5, 5));
edgesList.add(new Edge<Integer, Integer>(3, 6, 4));
edgesList.add(new Edge<Integer, Integer>(4, 5, 4));
edgesList.add(new Edge<Integer, Integer>(5, 6, 2));
edgesList.add(new Edge<Integer, Integer>(5, 7, 8));
edgesList.add(new Edge<Integer, Integer>(6, 7, 9));
DataSet<Edge<Integer, Integer>> edges = env.fromCollection(edgesList);
// Read the input data and create a graph.
Graph<Integer, Integer, Integer> graph = Graph.fromDataSet(edges, new InitVertices(), env);
// Convert the graph to undirected.
Graph<Integer, Integer, Integer> undirected_graph = graph.getUndirected();
// Define the maximum number of iterations.
int maxIterations = 10;
// Execute the vertex-centric iteration.
Graph<Integer, Integer, Integer> result = undirected_graph
.runGatherSumApplyIteration(
new CalculateDistances(),
new ChooseMinDistance(),
new UpdateDistance(),
maxIterations);
// Extract the vertices as the result.
DataSet<Vertex<Integer, Integer>> singleSourceShortestPaths = result.getVertices();
// Print the result.
singleSourceShortestPaths.print();
}
// - - - UDFs - - - //
// Gather: Calculate candidate distances from neighbors and edges of each vertex.
private static final class CalculateDistances extends GatherFunction<Integer, Integer, Integer> {
public Integer gather(Neighbor<Integer, Integer> neighbor) {
return (neighbor.getNeighborValue() < Integer.MAX_VALUE) ?
neighbor.getNeighborValue() + neighbor.getEdgeValue() : Integer.MAX_VALUE;
}
}
// Sum: Choose the min distance from candidate distances which calculate by GatherFunction
private static final class ChooseMinDistance extends SumFunction<Integer, Integer, Integer> {
public Integer sum(Integer newValue, Integer currentValue) {
return Math.min(newValue, currentValue);
}
}
// Apply: Update distance for each node when newDistance smaller than oldDistance
private static final class UpdateDistance extends ApplyFunction<Integer, Integer, Integer> {
public void apply(Integer newDistance, Integer oldDistance) {
if (newDistance < oldDistance) {
setResult(newDistance);
}
}
}
@SuppressWarnings("serial")
private static final class InitVertices implements MapFunction<Integer, Integer> {
// Init all vertex with the max value expected the srd vertex.
// Init the src vertex with 0.
public Integer map(Integer id) {
return id.equals(srcId) ? 0 : Integer.MAX_VALUE;
}
}
}
我们使用的数据集也是前面两篇文章中用到的数据即,即下图所示的图。
示例图
上图在GSA模型上的执行结果如下图所示
执行结果
Flink Gelly三种图计算模型的对比
Flink官网给出了三种图计算模型的对比,通过对比我们可以得出以下结论:
1. Vertex-Centric模型和Scatter-Gather模型中每个顶点可以与其他任意顶点进行通信,而Gather-Sum-Apply模型仅能与邻居顶点进行通信。
2. Scatter-Gather模型和Gather-Sum-Apply模型根据节点的状态确定是否与其他顶点进行消息通信。
3. Vertex-Centric模型可以自定义任意的更新逻辑和更新方法,Scatter-Gather模型的更新逻辑依赖于接收到的消息,而Gather-Sum-Apply模型的更新逻辑完全依赖于邻居顶点和邻接边的值。