深入了解Flink Gelly图计算模型之Scatter-Gat
Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,在上篇文章深入了解Flink Gelly图计算之—Vertex-Centric模型中我们详细的介绍了Vertex-Centric模型的实现原理和使用方法,接下来我们将详细的介绍一下Scatter-Gather模型的原理和实现过程,并以简单的示例来展示Scatter-Gather的使用。
Scatter-Gather模型也被称之为"signal/collect"模型,其核心思想和Vertex-Centric模型一样,从图中每个顶点的角度表达计算。其计算过程以同步迭代地方式进行,每次迭代过程称之为一个Superstep。在每个Superstep中,顶点为其他顶点生成消息,并根据接收到的消息更新其顶点的值。要在Flink Gelly中使用Scatter-Gather模型,用户只需要定义顶点在每个Superstep中的行为:
Scatter: 当前顶点生成一个将要发送给其他顶点的消息。
Gather: 当前顶点使用接收到的消息更新其当前顶点的值。
Flink Gelly 提供了Scatter-Gather的实现,用户只需要分别实现对应于Scatter和Gather阶段的两个方法即可。第一个方法是ScatterFunction
package com.quan.graph;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import java.util.LinkedList;
import java.util.List;
public class SG_SSSP {
// Set 1 as the source.
public static Integer srcId = 1;
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<Edge<Integer, Integer>> edgesList = new LinkedList<>();
edgesList.add(new Edge<Integer, Integer>(1, 2, 12));
edgesList.add(new Edge<Integer, Integer>(1, 6, 3));
edgesList.add(new Edge<Integer, Integer>(1, 7, 14));
edgesList.add(new Edge<Integer, Integer>(2, 6, 7));
edgesList.add(new Edge<Integer, Integer>(2, 3, 10));
edgesList.add(new Edge<Integer, Integer>(3, 4, 3));
edgesList.add(new Edge<Integer, Integer>(3, 5, 5));
edgesList.add(new Edge<Integer, Integer>(3, 6, 4));
edgesList.add(new Edge<Integer, Integer>(4, 5, 4));
edgesList.add(new Edge<Integer, Integer>(5, 6, 2));
edgesList.add(new Edge<Integer, Integer>(5, 7, 8));
edgesList.add(new Edge<Integer, Integer>(6, 7, 9));
DataSet<Edge<Integer, Integer>> edges = env.fromCollection(edgesList);
// Read the input data and create a graph.
Graph<Integer, Integer, Integer> graph = Graph.fromDataSet(edges, new InitVertices(), env);
// Convert the graph to undirected.
Graph<Integer, Integer, Integer> undirected_graph = graph.getUndirected();
// Define the maximum number of iterations.
int maxIterations = 10;
// Execute the vertex-centric iteration.
Graph<Integer, Integer, Integer> result = undirected_graph.runScatterGatherIteration(
new DistanceMessenger(), new MinMessageGather(), maxIterations);
// Extract the vertices as the result.
DataSet<Vertex<Integer, Integer>> singleSourceShortestPaths = result.getVertices();
// Print the result.
// scatter: messaging
public static final class DistanceMessenger extends ScatterFunction<Integer, Integer, Integer, Integer> {
public void sendMessages(Vertex<Integer, Integer> vertex) throws Exception {
if (vertex.getValue() < Integer.MAX_VALUE){
for (Edge<Integer, Integer> e : getEdges()) {
sendMessageTo(e.getTarget(), vertex.getValue() + e.getValue());
// gather: vertex update
public static final class MinMessageGather extends GatherFunction<Integer, Integer, Integer> {
public void updateVertex(Vertex<Integer, Integer> vertex, MessageIterator<Integer> inMessages) throws Exception {
Integer minDistance = Integer.MAX_VALUE;
for (Integer msg : inMessages) {
minDistance = Math.min(msg, minDistance);
if (minDistance < vertex.getValue()){
private static final class InitVertices implements MapFunction<Integer, Integer> {
// Init all vertex with the max value expected the srd vertex.
// Init the src vertex with 0.
public Integer map(Integer id) {
return id.equals(srcId) ? 0 : Integer.MAX_VALUE;
Name: 可以使用setName()
Parallelism: 可以使用 setParallelism()
Solution set in unmanaged memory: 可以使用setSolutionSetUnmanagedMemory() 方法来指定结果集是否保存在托管内存中,默认情况下结果集是运行在托管内存中。
Aggregators: 可以使用registerAggregator()方法来为每个迭代注册聚合函数,注册的聚合函数可以在ScatterFunction
Broadcast Variables: 可以分别使用addBroadcastSetForUpdateFunction()
方法添加广播变量(Broadcast Variables)。
Number of Vertices: 为了控制每次迭代中访问的总顶点数,可以使用setOptNumVertices()
方法来设置。可以使用 getNumberOfVertices()
Degrees: Accessing the in/out degree for a vertex within an iteration. This property can be set using the setOptDegrees()
method. The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the getInDegree()
and getOutDegree()
methods. If the degrees option is not set in the configuration, these methods will return -1.
Messaging Direction: 默认情况下,Flink Gelly中的图是有向图,因此一个顶点只往其出度的方向发送消息,并根据其入度方向接收消息来更新顶点状态。因此可以通过设置Messaging Direction来改变这一默认情况。Messaging Direction分为EdgeDirection.IN
, EdgeDirection.OUT
和 EdgeDirection.ALL
,可以通过方法setDirection()` method进行设置。
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
// set the iteration name
parameters.setName("Gelly Iteration");
// set the parallelism
// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());
// run the scatter-gather iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
new Messenger(), new VertexUpdater(), maxIterations, parameters);
// set the number of vertices option to true
// set the degree option to true
// run the scatter-gather iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
new Messenger(), new VertexUpdater(), maxIterations, parameters);