Muduo库设计(2)——Reator的关键结构

2019-11-27  本文已影响0人  Magic11

一、IO multiplexing简介

首先介绍一下pollfd结构体及poll接口:

struct pollfd {
 int fd;        //想查询的文件描述符
 short events;  //要求查询的事件掩码(我们感兴趣的事件)
 short revents; //返回的事件掩码(实际发生的事件)
};

//events_ |= POLLIN | POLLPRI;  表示监控是否可读。
//events_ |= POLLOUT;           表示监控是否可写。
//events_ |= POLLIN|POLLOUT;    表示监控是否可读或可写。

//revents_ & (POLLERR | POLLNVAL)           为true  表示出错
//revents_ & (POLLIN | POLLPRI | POLLRDHUP) 为true  表示可读
//revents_ & POLLOUT                        为true  表示可写
int poll(struct pollfd *ufds, unsigned int nfds, int timeout);

//ufds 指向 struct pollfd 数组 
//nfds 指定 pollfd 数组元素的个数,也就是要监测几个 pollfd
//timeout是等待的毫秒数,timeout为负数表示无线等待,timeout为0表示调用后立即返回。

//执行结果:为0表示超时前没有任何事件发生;
//          -1表示失败;
//          成功则返回结构体中revents不为0的文件描述符个数。

二、Reator的关键结构

Reator的核心是事件分发机制,即将IO多路复用(IO multiplexing)拿到的IO事件分发给各个文件描述符(fd)的事件处理函数。

这里主要有三个关键的类Poller、EventLoop、Channel:
Poller类负责IO多路复用
EventLoop负责IO事件分发
Channel类负责管理一个文件描述符(fd),并存储该fd所关注的每个事件对应的回调函数

1、Poller类——只负责多路复用

Poller包含一个vector和一个map
其中vector用来存放pollfd, map用来存放 <fd, channel> 键值对

其中vector用于每次调用poll接口的参数

int numEvents = ::poll(pollfds_.data(), pollfds_.size(), timeoutMs);

vector和map更新的逻辑如下:
1) 当有一个新的channel进来时
a) 创建新的pollfd,从channel中取出fd,要查询的事件events,赋值给pollfd
b) 将新的pollfd插入vector的尾部,并将pollfd在vector中的位置赋值给 channel的index成员
c) 将键值对<fd, channel>插入map
2)已存在的channel
a) 取出channel的index成员变量的值
b) 根据index从vector中取出pollfd
c) 从channel中取出要查询的事件events,赋值给pollfd
具体的代码逻辑如下:

void Poller::updateChannel(Channel* channel)
{
  assertInLoopThread();                                 //必须要在当前线程中执行
  if (channel->index() < 0) {                           //channel的index默认为 -1
    //新的channel对象
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events()); //这里channel的events是一个整形   int events_;
    pfd.revents = 0;                                    //新插入的channel  revents初始化为0
    pollfds_.push_back(pfd);                            //将fd插入vector
    
    int idx = static_cast<int>(pollfds_.size()) - 1;  
    channel->set_index(idx);                            //channel的index设置为fd在
    channels_[pfd.fd] = channel;                        //将 <fd, channel> 插入map
  } else {
    //更新已存在的channel对象
    int idx = channel->index();                         //已存在的channel对象,其index值记录fd在vector中的位置
    struct pollfd& pfd = pollfds_[idx];                 //从vector中取出index位置的pfd
    pfd.events = static_cast<short>(channel->events()); //更新要查询的事件
    pfd.revents = 0;                                    //revents初始化为0
    if (channel->isNoneEvent()) {
      // ignore this pollfd
      pfd.fd = -1;
  }
}

可以看出updateChannel函数必须要在当前线程中执行,当前线程也即创建EventLoop对象的线程。
这里还没看出 map 的作用,map其实是用于 每次调用poll之后,可以通过fd快速查询到channel对象,从而返还给EventLoop用于事件的分发,具体代码如下:

void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
  //遍历vector
  for (auto pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd)
  { 
    if (pfd->revents > 0)                    //有要发生的事件
    {
      --numEvents;
      auto ch = channels_.find(pfd->fd);     //从map中取出channel
      Channel* channel = ch->second;
      channel->set_revents(pfd->revents);    //更新的channel的revents成员
      activeChannels->push_back(channel);    //插入到activeChannels
    }
  }
}
2、EventLoop类——负责事件的轮询和分发

每个EventLoop类在创建时都会持有一个Poller对象

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    threadId_(std::this_thread::get_id()),
    poller_(new Poller(this)),
{
}

EventLoop类负责事件的轮询和分发,具体代码如下:

void EventLoop::loop()
{
  assertInLoopThread();
  looping_ = true;
  quit_ = false;

  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs/*10000*/, &activeChannels_);
    for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
    {
      (*it)->handleEvent();
    }
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

每次轮询结束后得到的是一个channel的数组,通过遍历该数组,依次调用每个channel的handleEvent函数。

3、Channel类——负责管理一个文件描述符(fd)及相应的事件回调函数

Channel类在创建的时候需要传入一个fd:

Channel::Channel(EventLoop* loop, int fd)
  : loop_(loop),
    fd_(fd),
    events_(0),
    revents_(0),
    index_(-1)
{
}

这里可以看到Channel类另外三个关键的成员:events、revents、index, 它们的作用在介绍Poller类时已经说过了.
1)Channel如何更新事件(events):

void enableReading()  { events_ |= kReadEvent;   update();   }  //关注是否可读
void enableWriting()  { events_ |= kWriteEvent;  update();   }  //关注是否可写
void disableWriting() { events_ &= ~kWriteEvent; update();   }  //不再关注是否可写
void disableAll()     { events_  = kNoneEvent;   update();   }  //不关注任何事件
void Channel::update()
{
  loop_->updateChannel(this);
}
  1. Channel如何处理事件的回调
    首先Channel的调用者需要先注册回调:
typedef std::function<void()> EventCallback;

void setReadCallback(const EventCallback& cb)
{ readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb)
{ writeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb)
{ errorCallback_ = cb; }

其次EventLoop在每次事件分发时,对于有事件发生的Channel都会调用channel.handleEvent()方法

void EventLoop::loop()
{
  while (!quit_)
  {
    activeChannels_.clear();
    poller_->poll(kPollTimeMs, &activeChannels_);
    for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
    {
      (*it)->handleEvent();
    }
  }
}
void Channel::handleEvent()
{
  if (revents_ & POLLNVAL) {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL)) {
    if (errorCallback_) errorCallback_();
  }
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
    if (readCallback_) readCallback_();
  }
  if (revents_ & POLLOUT) {
    if (writeCallback_) writeCallback_();
  }
}

可以看出Channel事件的更新update() 和 事件的回调handleEvent()都是在当前线程中执行的。
下面看一个具体Channel的用法示例:

void timeout()
{
  printf("Timeout!\n");
  g_loop->quit();
}

int main()
{
  muduo::EventLoop loop;

  int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
  
  muduo::Channel channel(&loop, timerfd);
  channel.setReadCallback(timeout);
  channel.enableReading();

  struct itimerspec howlong;
  bzero(&howlong, sizeof howlong);
  howlong.it_value.tv_sec = 5;
  ::timerfd_settime(timerfd, 0, &howlong, NULL);

  loop.loop();

  ::close(timerfd);
}
上一篇 下一篇

猜你喜欢

热点阅读