深入了解Flink Gelly图计算模型之Gather-Sum-

2022-01-07  本文已影响0人  老羊_肖恩

  Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,在之前的文章深入了解Flink Gelly图计算之—Vertex-Centric模型深入了解Flink Gelly图计算模型之Vertex-Centric中我们详细的介绍了Vertex-Centric模型和Scatter-Gather模型的实现原理和使用方法,接下来我们将详细的介绍一下Gather-Sum-Apply模型的原理和实现过程,并以简单的示例来展示Gather-Sum-Apply模型的使用方法。

Gather-Sum-Apply模型概述

  Gather-Sum-Apply模型的计算过程也是基于同步迭代,每个迭代的轮次称之为Superstep。每个Superstep由以下三个阶段组成:
Gather: 在每个轮次的迭代中,每个顶点并行的在其邻接顶点点和边上执行udf,并产生一个中间结果。
Sum: 每个顶点将其在Gather阶段产生的一系列中间结果应用udf进行聚合,生成一个单独的值。
Apply: 每个顶点根据Sum阶段产生的新结果只和当前顶点的属性值应用udf对当前顶点进行更新操作。
  我们同样以单源点最短路径为例,简单介绍GSA每个阶段的具体执行逻辑。假设我们存在如下一个有向图,我们需要计算顶点1到其余三个顶点2,3,4的最短路径。首先我们需要对每个顶点的值进行初始化,其中顶点1的值(值记在顶点上的方框里)初始化为0,其余顶点的值默认初始化为正无穷。接下来以下图为例,详细介绍三个阶段的主要功能,在第一轮迭代中:
  (1)Gather:顶点1没有邻居节点(考虑是有向图,没有边指向顶点1),因此不用计算中间距离值。顶点2的邻居为1,中间值结果为:\small 2-2;顶点3的邻居为1,中间值结果为:\small 3-3;顶点4的邻居为1,2,3,中间值结果为:\small 4-4, 4-∞, 4-∞
  (2)Sum:上个阶段执行完成后,顶点2的中间值结果为:\small 2-2;顶点3的中间值结果为:\small 3-3;顶点4的中间值结果为:\small 4-4, 4-∞, 4-∞。该阶段执行聚合操作,挑选出最小的中间值为新值,因此,聚合的结果为:\small 2-2,3-3,4-4
  (3)Apply:根据Sum聚合的结果,将每个顶点的当前值按照取最小进行更新,则该轮Apply之后,每个顶点的值为\small 1:0, 2:2, 3:3,4:4
第一轮迭代完成后,在第二轮迭代中,顶点4的Gather阶段中间值为变成\small 4-4, 4-3, 4-5,Sum阶段的聚合结果为3,最终的Apply阶段后,每个顶点的值为\small 1:0, 2:2, 3:3,4:3。然后在下一轮迭代中,没有任何顶点回在Apply阶段更新当前值,因此整个GSA模型迭代结束,最终结果为\small 1:0, 2:2, 3:3,4:3

Gather-Sum-Apply

Gather-Sum-Apply模型使用

  上面我们大致介绍了GSA模型三个阶段的基本执行过程。GSA模型在实际使用的时候也非常简单,只需要用户根据实际需求实现三个阶段的UDF即可,即GatherFunctionSumFunctionApplyFunction。下面我们给出了一段示例代码,简单展示了#Gather-Sum-Apply模型的使用方法。

package com.quan.graph;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;

import java.util.LinkedList;
import java.util.List;

public class GSA_SSSP {

    // Set 1 as the source.
    public static Integer srcId = 1;

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Edge<Integer, Integer>> edgesList = new LinkedList<>();
        edgesList.add(new Edge<Integer, Integer>(1, 2, 12));
        edgesList.add(new Edge<Integer, Integer>(1, 6, 3));
        edgesList.add(new Edge<Integer, Integer>(1, 7, 14));
        edgesList.add(new Edge<Integer, Integer>(2, 6, 7));
        edgesList.add(new Edge<Integer, Integer>(2, 3, 10));
        edgesList.add(new Edge<Integer, Integer>(3, 4, 3));
        edgesList.add(new Edge<Integer, Integer>(3, 5, 5));
        edgesList.add(new Edge<Integer, Integer>(3, 6, 4));
        edgesList.add(new Edge<Integer, Integer>(4, 5, 4));
        edgesList.add(new Edge<Integer, Integer>(5, 6, 2));
        edgesList.add(new Edge<Integer, Integer>(5, 7, 8));
        edgesList.add(new Edge<Integer, Integer>(6, 7, 9));

        DataSet<Edge<Integer, Integer>> edges = env.fromCollection(edgesList);

        // Read the input data and create a graph.
        Graph<Integer, Integer, Integer> graph = Graph.fromDataSet(edges, new InitVertices(), env);

        // Convert the graph to undirected.
        Graph<Integer, Integer, Integer> undirected_graph = graph.getUndirected();

        // Define the maximum number of iterations.
        int maxIterations = 10;

        // Execute the vertex-centric iteration.
        Graph<Integer, Integer, Integer> result = undirected_graph
                .runGatherSumApplyIteration(
                        new CalculateDistances(),
                        new ChooseMinDistance(),
                        new UpdateDistance(),
                        maxIterations);

        // Extract the vertices as the result.
        DataSet<Vertex<Integer, Integer>> singleSourceShortestPaths = result.getVertices();

        // Print the result.
        singleSourceShortestPaths.print();
    }

    // - - -  UDFs - - - //
    // Gather: Calculate candidate distances from neighbors and edges of each vertex.
    private static final class CalculateDistances extends GatherFunction<Integer, Integer, Integer> {

        public Integer gather(Neighbor<Integer, Integer> neighbor) {
            return (neighbor.getNeighborValue() < Integer.MAX_VALUE) ?
                    neighbor.getNeighborValue() + neighbor.getEdgeValue() : Integer.MAX_VALUE;
        }
    }

    // Sum: Choose the min distance from candidate distances which calculate by GatherFunction
    private static final class ChooseMinDistance extends SumFunction<Integer, Integer, Integer> {

        public Integer sum(Integer newValue, Integer currentValue) {
            return Math.min(newValue, currentValue);
        }
    }

    // Apply: Update distance for each node when newDistance smaller than oldDistance
    private static final class UpdateDistance extends ApplyFunction<Integer, Integer, Integer> {

        public void apply(Integer newDistance, Integer oldDistance) {
            if (newDistance < oldDistance) {
                setResult(newDistance);
            }
        }
    }

    @SuppressWarnings("serial")
    private static final class InitVertices implements MapFunction<Integer, Integer> {
        // Init all vertex with the max value expected the srd vertex.
        // Init the src vertex with 0.
        public Integer map(Integer id) {
            return id.equals(srcId) ? 0 : Integer.MAX_VALUE;
        }
    }
}

我们使用的数据集也是前面两篇文章中用到的数据即,即下图所示的图。


示例图

上图在GSA模型上的执行结果如下图所示


执行结果

Flink Gelly三种图计算模型的对比

  Flink官网给出了三种图计算模型的对比,通过对比我们可以得出以下结论:
1. Vertex-Centric模型和Scatter-Gather模型中每个顶点可以与其他任意顶点进行通信,而Gather-Sum-Apply模型仅能与邻居顶点进行通信。
2. Scatter-Gather模型和Gather-Sum-Apply模型根据节点的状态确定是否与其他顶点进行消息通信。
3. Vertex-Centric模型可以自定义任意的更新逻辑和更新方法,Scatter-Gather模型的更新逻辑依赖于接收到的消息,而Gather-Sum-Apply模型的更新逻辑完全依赖于邻居顶点和邻接边的值。

Flink Gelly图计算模型对比
  1. Flink-Gelly:Iterative Graph Processing
  2. 深入了解Flink Gelly图计算模型之Scatter-Gather
  3. 深入了解Flink Gelly图计算模型之Vertex-Centric
上一篇下一篇

猜你喜欢

热点阅读