Muduo库设计(3)——EventLoop::runInLoo

2019-11-28  本文已影响0人  Magic11

RunInLoop函数允许其他线程往当前线程添加回调函数,并能够保证所添加的回调函数都在当前线程中执行

typedef std::function<void()> Functor;
void runInLoop(const Functor& cb);

由于添加的回调函数并不能立即被执行,为了让runInLoop函数尽快返回,EventLoop对象需要持有一个函数数组,用于临时存储这些被添加的回调函数

typedef std::function<void()> Functor;
std::vector<Functor> pendingFunctors_;

由于pendingFunctors_可以被其他线程访问,所以需要加锁保护
runInLoop的执行逻辑如下:
首先判断是否在当前线程,若在当前线程,直接执行该回调,否则将该回调加入pendingFunctors_数组,代码逻辑如下:

void EventLoop::runInLoop(const Functor& cb)
{
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(cb);
  }
}
void EventLoop::queueInLoop(const Functor& cb)
{
  {
    std::lock_guard<std::mutex> lock(mutex_);
    pendingFunctors_.push_back(cb);
  }

  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

注意:当将回调函数加入回调队列之后,需要调用一下weakup函数;
由于IO线程平时会阻塞在事件循环EventLoop::loop()对poll()的调用中,weakup的作用是将当前线程尽快从阻塞中唤醒。
Wakeup的逻辑如下:
首先EventLoop需要创建一个fd,并创建一个负责该fd的Channel,然后添加该channel的读事件回调,并一直检测该Channel的读事件,代码逻辑如下:

EventLoop::EventLoop() 
    : wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_))
{
  wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
  // 一直检查该fd是否可读
  wakeupChannel_->enableReading();
}
void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}
void EventLoop::handleRead()
{
  uint64_t one = 1;
  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}

每次EventLoop执行完事件的抓取(poll)及事件的分发(handleEvent)之后,就会依次调用回调函数队列里的函数,代码逻辑如下:

void EventLoop::loop()
{
  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
    {
      (*it)->handleEvent();
    }
    doPendingFunctors();    //执行回调队列里的函数
  }
}

执行回调函数的逻辑如下:

void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  {
    std::lock_guard<std::mutex> lock(mutex_);
    functors.swap(pendingFunctors_);
  }

  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}

此处,为了避免在加锁的状态下对函数队列的访问时间过长,采取了一个优化,即先将函数队列swap到一个临时的vector里面,然后再依次遍历并执行临时vector里的回调函数。
这样既减少了临界区的长度(意味着不会阻塞其他线程调用queueInLoop),也避免了死锁(因为在回调函数中还可能调用queueInLoop,在已加锁的状态下还要去申请锁)。

上一篇下一篇

猜你喜欢

热点阅读