libprocess原理&Actor模型

2020-11-30  本文已影响0人  celusing

一.Actor模型

1.并发通信方式

一般来说,并发线程中通信方式有两种:

2.Actor介绍

Actor是通用的并发编程模型,并非某个语言或框架特有。典型的是erlang,从语言层面上支持Actor模型。

3.Actor做什么

当一个actor接收到消息后,它能做如下三件事中的一件:

二.Libprocess

每个Actor(Process)占用一个线程。不同Actor,可能处于同一个进程内,也可以处于不同进程。调用方式:

1.Libprocess中概念介绍

Libprocess是基于Actor模型实现的库,提供基于actor style的并行编程框架。通过使用libprocess,异步并行变成变得相对简单。

libprocess的基础是:event visitor和event list,其继承关系可表述为:


深度截图_选择区域_20201130162329.png

process的基类是event_visitor,其内部有个属性:event_list,process的工作实际是visit这个list中的events并执行。
但是process本身不会主动去visitor各个events,实际上各个Actor仍然运行在实体Thread上。在libprocess库中,全局静态变量类process_manager(声明在:process.h中,在process.cpp中定义)将负责启动线程并运行Actor.

运行流程总结如下:

//如果process::initialize()从未被调用过,则创建至少8个线程或者等于CPU数目的线程
long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLY))
for (int i = 0; i < cpus, i++) {
  pthread_t thread; //for now , not saving handles on our threads
  if (phtread_create(&thread, null, schedule, null) != 0) {
  LOG(FATAL) << "Failed to initialize, pthread_create";
}
}

声明process之后,实体线程运行的schedule函数取出运行队列中process并运行。

//调度器不断查看process_manager队列中是否有process,如果有,恢复并启动process.
void* schedule(void* arg)
{
  do{
      ProcessBase* process = process_manager->dequeue();
      if (process == NULL) {

      }
      process_manager->resume(process);
  }
}

所以process声明之后,还需要写入相应线程的运行队列中,才能够开始运行。
将process写入运行队列需要通过spawn(process)函数。

UPID ProcessManager::spawn(ProcessBase* process, bool manager)
{
  CHECK(process != NULL);
  synchronized (process) {
    //processes已经记录过该process
    if (processes.count(process->pid.id) > 0) {
      return UPID();
    } else {
      //如果是新的Actor,记录下来
      processes[process->pid.id] = process;
    }
  }

  if (manager) {
    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
  }

  UPID pid = process->self();
   //Actor入队
   enqueue(process);

  return pid;
}

至此,Actor创建成功,并放入process_manager的队列中,process_manager中有一个线程,不断激活各个Actor。Actor启动成功。

1.libprocess中多个process的单机并行通信:dispatcehr/delay

同一个进程内的不同Actor之间的并行通信,不需要获取返回值。

  1. dispatcher()
process::PID<SimpleProcess> pid = process::spwn(simpleProcess);
process::dispatcher(pid, &SimplerProcesss::doSomething, "test");

dispatcher()方法将一个event插入目标process的event队列中。event由目标process的成员函数和相应的变量组成,dispatcher成功后,对应process的event队列将有一个执行相关函数的event.

  1. delay()
delay(Seconds(5), self(), &Self::batch)

delay()是延迟的dispatcher(),dispatch和delay方法均可以通过self()函数对本线程写入新event。

2.libporcess中的异步并发:future/promise/defer

同一个进程内的不同Actor之间的并行通信,需要获取返回值。

  1. future
    用于异步回调的对象,一般由promise产生,可以在不同process中拷贝/引用,future中定义了各个回调函数接口:
const Future<T>& onDiscard(DiscardCallback&& callback) const;
const Future<T>& onRready(ReadyCallback&& callback) const;
const Future<T>& onFailed(FailedCallback&& callback) const;
const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
const Future<T>& onAny(AnyCallback&& callback) const;

当一个future对象状态发生变化时,相应的回调函数会被调用,从而达到异步并发的效果。

  1. promise
    future的入口,一般只在一个process中set(赋值)。promise不应该再多个process之间拷贝/引用。promise中只包含了future成员变量。
    一般通过promise.future的方式产生一个future。futrue可以在不同process中引用/拷贝,但只能在一个进程中通过promise设置。因为promise一般只存在一个process中。
//简单实例
class Simple : public process::Process<SimpleProcess>
{
public:
  Future<Nothing> doSomething(const string msg) {
    cout << "Wrapping message" << msg << endl;
    return Nothing();
  }

  Future<int> calc(int lhs, int rhs) {
    return Promise<int>(lhs+rhs).future();
  }
private:
  Promise<bool> shouldQuit;  
}

int runProcess()
{
  SimpleProcess simpleProcess;
  process::PID<SimpleProcess> pid = process::spawn(simpleProcess);

  process::dispatcher(pid, &SimpleProcess::doSomething, "test");
  
  Future<int> sum = process::dispatch(pid, &SimpleProcess::calc, 99, 101);
  sum.then([](int n) {
    cout << "99 + 101 = " << n << endl;
    return Nothing();
  })

  sum.await();
  process::terminate(SimpleProcess);
  process::wait(simpleProcess);
}
  1. defer
    如果使用如下语句:
dispatch(master_, &Master::indicateInverseOffer, request).onAny(dispatch(self(), &Self::updateRequestQueue, lambda::_1))

这一命令向master process dispach一个event,调用master类中的indicateInverseOffer函数。该函数返回一个future。我们希望future状态变化时,往本线程dispatcher一个event。
但是,onAny中的dispatch函数是由master process赋值的future调用,从而是由master process所属线程发起的dispatcher调用。不合目的。

dispatch(master_, &Master::indicateInverseOffer, request).onAny(defer(self(), &Self::updateRequestQueue, lambad::_1))

简单来说,defer()返回一个called object, 这个called object是在相应线程被初始化的。
那么当indicateInverseOffer()所返回的future状态发生变化时,onAny调用的是defer()函数的返回值,这个返回值是再代码所属的process产生的,这保证了程序的意图。

3.Libprocess分布式系统编程:基于protobuf消息的方法

不同进程间的不同Actor之间的并行通信。
libprocess通过提供send()和install()函数进行分布式通信。虽然libprocess分布式通信不依赖于protobuf, 但是为了简化消息结构序列化问题,libprocess提供protobufProcess类,这一定制的process支持基于protobuf的消息通信。

  1. install()
    Install方法将handler注册到process类所属的handler map中,一旦相应message到来。即启用handler map对应的handler(函数),以下定义作为install方法的实现路径之一:
template <typename M>
void install(void (T::*method)(const process::UPID))
{
  google::protobuf::Message* m = new M();
  T* t = static_cast<T*>(this);
  protobufHandlers[m->GetTypeName()] = lambda::bind(handler0, t, method,   
  lambda::_1, lambda::_2);
  delete m;
}

install 接受protobuf message中的field作为独立的输入,如:
install<SlaveRegisteredMessage>
(&SimpleMasterProcess::SlaveRegistered,&SlaveRegisteredMessage::slave_id)

当SlaveRegisteredMessage信息到来之后,process将把slaveRegistered()函数作为一个event写入队列,并将message中的slave_id field作为输入量。
特别注意的是,protobufProcess中,protobuf消息中的repeat fileds将会被自动转换成vector.

template<typename T>
std::vector<T> convert(const google::protobuf::RepeatedPtrFiled<T>& items)
{
  vector<T> result;
  for(int i = 0; i < items.size(); i++) {
    result.push_back(items.Get(i));
  }
  
  return result;
}
  1. UPID与通信
    非分布式通信,即同一进程内,不同Actor区分通过PID。
    在基于libprocess的分布式系统中,每个process均有一个唯一的UPID,process之间分布式通信通过UPID识别地址,UPID的主要信息如下:
std::string id;
//一般是ip:port
network::Addresss address;

每个UPID实现了libprocess的地址空间中唯一地址,其中id默认是从1开始,spawn一个新的process,则增加到(2),如此类推。

  1. send()
    使用send()换气远程process的回调函数:通过UPID,向远程process发送一个protobuf消息。远程process接收到消息后,将取出消息对应的handler,将它放入任务队列。
    同时注意,send()函数总会发送起始process的UPID,所以protobufProcess的install函数所有的handler的第一个输入永远是:
//源UPID
const process::UPID&

这样,可以知道每个消息来源的UPID.

上一篇下一篇

猜你喜欢

热点阅读