2. Pregel 计算模型

2018-12-06  本文已影响0人  GongMeng

1. Pregel开发时, 图计算领域面对的问题

谷歌一气之下, 在Valiant’s Bulk Synchronous Parallel Model的技术上做出了Pregel. 从算法理论到工业实现用了20年, 和cherry lock有一拼.

Leslie G. Valiant, A Bridging Model for Parallel
Computation. Comm. ACM 33(8), 1990, 103–111

2. 抽象模型概述

2.1 Supserstep

Pregel计算由一系列的被称之为superstep的步骤组成, 它的输入是一个有向图, 所有的vertex必须是唯一
superstep的第N步, 会收取N-1步的信息, 计算, 更新状态, 把信息按照边向后发送, 等待第N+1步读取.

2.2 计算模型的状态机


superstep何时终止, 取决于所有节点的投票vote to halt的结果.
当所有的点都不再收取信息时, 就会终止. 寻找图中最大的点

在论文中介绍了一个非常简单的例子如上, 我们要想办法在一个图中找到value最大的vertex.

可以从图中看到, 在每个supersetp

  1. 每个点比保存一个max(local_value, message_value)
  2. 如果没有变化, 则vote to halt
  3. 如果有变化则把max value沿着边发送
  4. 重复以上步骤, 直到所有verte vote to halt(变灰)

这样我们就可以找到一个图里的最大值, 而且可以看到这个过程可以是一串的map reduce过程, 在Map中所有点接收信息并比较, 在Reduce中产生vetex与vetex之间的信息转移.

3 Pregel程序的执行过程

一下过程基于google自己的底层框架, 换到spark+hbase上一样说的通

  1. 首先启动一个master点, 可以直接理解为spark里的driver, 用于管理整个程序的进度. 所有的woker需要把自己注册到这个master上.
  2. master负责把图划分为多个partition, 并指定每个worker需要负责的部分. worker需要对自己维护的这一部分graph执行用户写的computer()操作, 并维护这一部分图的状态. 每个worker都知道其它的任何一个worker维护的是哪部分graph, 这个问master就好了.
    1.master开始分发用户的输入, 如果worker观察到用户的input与自己维护的graph有关, 则立即对这部分graph进行修改和操作, 否则就把数据送到它该去的地方. 分发过程结束后, 这个分布式的图已经和用户的input融合, 所有的vertex处于active状态
  3. 开始执行superstep, 回到vote to halt的状态到master那里去
  4. master观察到所有的vertex都halt了, 运行结束

4. Pregel下的图算法

4.1 PageRank

class PageRankVertex
  : public Vertex<double, void, double> {
public:
   // 假设一共有v个节点, 任何一个点初始的权重都是 1/v
  virtual void Compute(MessageIterator* msgs) {
    if (superstep() >= 1) {
      double sum = 0;
      for (; !msgs->Done(); msgs->Next())
          sum += msgs->Value();
       // 节点的权重调整如下, 引用它的网站越多, 它的权重越高, 引用它的网站约重要, 它的权重越高
      *MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
    }
    // 我们只进行30层运算
    if (superstep() < 30) {
      // 把这个vertex, 也就是网站的权重向所有它引用的网站进行广播
      const int64 n = GetOutEdgeIterator().size();
      SendMessageToAllNeighbors(GetValue() / n);
    } else {
      VoteToHalt();
    }
  }
};

4.2 Shortest Path

寻找source vertex和图中任何一个其它点之间的最短路径
一开始source vertex的距离是0, 其它任何一个点都是INF(无限), 然后source vertex开始广播
之后每个superstep, 每个接收从邻居那里来的message, 如果邻居记录的值加上边长比本地值小, 则修改记录. 否则就vote for halt.
重复以上步骤直到所有的点都不动了.

class ShortestPathVertex
:public Vertex<int, int, int> {
  void Compute(MessageIterator* msgs) {
    int mindist = IsSource(vertex_id()) ? 0 : INF;
    for (; !msgs->Done(); msgs->Next())
      mindist = min(mindist, msgs->Value());
    if (mindist < GetValue()) {
      *MutableValue() = mindist;
      OutEdgeIterator iter = GetOutEdgeIterator();
      for (; !iter.Done(); iter.Next())
        SendMessageTo(iter.Target(),
      mindist + iter.GetValue());
    }
    VoteToHalt();
  }
};

class MinIntCombiner : public Combiner<int> {
  virtual void Combine(MessageIterator* msgs) {
    int mindist = INF;
    for (; !msgs->Done(); msgs->Next())
      mindist = min(mindist, msgs->Value());
    Output("combined_source", mindist);
  }
};

5. 待解决问题

上一篇 下一篇

猜你喜欢

热点阅读