从逻辑上扩展Epoll的事件列表
环境:X86_64 CentOS 7.0
对于Epoll自身支持的事件类型有如下几种,详细可以man 2 epoll_ctl
- EPOLLIN
- EPOLLOUT
- EPOLLERR
- EPOLLET
- EPOLLONESHUT
- EPOLLHUP 没用过
- EPOLLRDHUP 没用过
- EPOLLPRI 没用过
- EPOLLWAKEUP 没用过
- EPOLLEXCLUSIVE 没用过
以上是Epoll支持的事件类型,鶸只用过前几个,没有用过的也不敢多言。这些事件对于一般的场景使用应当已足以。
但是对于需要扩充事件类型时,比如:
- 需要优雅退出的时候,如何让epoll_wait知道现在程序需要退出了。
- 有数据包需要发送的时候,如何通知epoll_wait 让其尽快唤醒网络线程,去处理网络包。
- ... ... 等等
上述两条是鶸用到的场景,是学习了Tars的网络层处理之后,在自己的项目中使用到的。(当然实现方式很多,这里只是其中一种粗浅的方式)
众所周知,epoll维护了一个红黑树,来快速查找到有数据到来的fd对应的epoll_event结构体,从而copy给用户态,供用户详细地处理。epoll_event的结构如下:
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data ; /* User data variable */
}
typedef union epoll_data {
void * ptr ;
int fd ;
uint32_t u32;
uint64_t u64;
} epoll_data_t ;
从这里可以看出来,epoll自己的Event类型,最终会赋值给events变量。给用户留的只有epoll_data。但epoll_data是一个union,只能使用其中一个字段。我们大多数时间使用fd,用来知道具体是哪一个fd产生了事件。但这不能够用,比如我们还要记录一些这个fd上的上下文信息, 举个例子,假设我们使用ET模式,上一次收包可能只收到一部分,这时候就需要暂时将这部分数据暂存起来,并且记录一下已经收到的长度等等信息。
这时候就可以有多种做法了,可以使用map来将fd作为key,通过每次索引,找到fd对应的上下文结构体,也是没有问题的。
也可以使用epoll_data的ptr字段,将详细的fd信息、上下文信息保存在一个动态申请的结构体中,将指针赋值给ptr就可以。只需要维护好这个指针的生存期就可以了。
鶸就使用了类似于类似于第二种的做法。只不过是自己生成了一个连接ID,保存在了epoll_data_t 的低32位中。
关键就在这里,epoll_data_t是一个union,union的特点是,其大小是最长的一个元素的大小。也就是说,不管你给epoll_data_t里面存了什么,哪怕是只写了一个bit,那它也占8个字节。
我们就借助这个特点,将epoll_data_t的高32位也利用起来。
enum EVENT{
EV_NET = 0,
EV_LISTEN,
EV_CLOSE,
EV_SEND,
EV_TERMINATED
};
template<typename T, typename DataQueue>
void NetThread<T,DataQueue>::start_run() noexcept {
while ( !terminated.load(std::memory_order_acquire) ) {
int evs = epoll_instance.start_epoll_wait(-1) ;
if ( evs <= 0 ) {
continue ;
}
for( size_t i = 0 ; i < evs ; i++ ) {
const epoll_ev& ev = epoll_instance.get_event(i) ;
uint32_t ev_type = ev.data.u64 >> 32 ;
switch (ev_type) {
case EV_LISTEN:
if ( ev.events & EPOLLIN ) {
LOG_INFO("recv EV_LISTEN event\n");
processListenfd() ;
}
break ;
case EV_NET:
processNet(ev) ;
break ;
// process close
case EV_CLOSE:
// maybe will not occur
processClose(ev) ;
break ;
case EV_TERMINATED:
// terminated all net thread
break ;
case EV_SEND:
// send packages ~
processSend() ;
break ;
}
}
}
}
这样,我们就借助了epoll_data_t的高32位,从逻辑上扩展了Epoll的事件列表。
乍一看有些二,写的是啥玩意儿。稍作解释一下:
前面也说了,鶸用epoll_data_t的低32位记录了一个客户端的连接ID。
默认情况下,EV_NET,EV_CLOSE,EV_TERMINATED,EV_SEND是都不会触发的。这些只有在fd需要这些事件的时候,通过epoll_ctl,注册EPOLLOUT事件的同时,将EV_NET,EV_CLOSE,EV_TERMINATED,EV_SEND写入到epoll_data_t的高32位。
举个例子,当需要通知网络线程发送数据的时候,就可以如此处理(这里的sendfd是一个单独的fd,只用socket创建出来,让其可以加入到epoll中即可。)
#define H64(x) ((long)(x)<<32)
// for handler use set response data
virtual bool add_send_data(DataPkg& datapkg) override {
if ( curr_send_queue_size.load(std::memory_order_acquire) > max_send_size ) {
LOG_ERROR("send queue is full ...") ;
return false ;
}
// add to send queue
{
std::lock_guard<std::mutex> l{lock} ;
if ( send_queue.size() > max_send_size) {
LOG_ERROR("send queue is full ...") ;
return false ;
}
send_queue.emplace_back(datapkg) ;
curr_send_queue_size++;
}
// add EPOLLOUT event of this fd
epoll_instance.mod(sendfd , H64(EV_SEND), EPOLLOUT|EPOLLET) ;
return true ;
}
给sendfd注册EPOLLOUT事件:
void ctrl(int fd , uint64_t data , uint32_t events , int op) noexcept {
epoll_ev ev ;
ev.data.u64 = data ;
ev.events = events ;
::epoll_ctl(epollfd , op , fd , &ev) ;
}
void add_fd(int fd ,uint64_t data,uint32_t ev , bool nonblock) noexcept {
if ( nonblock) {
set_nonblocking(fd) ;
}
ctrl(fd , data , ev, EPOLL_CTL_ADD) ;
}
void mod(int fd , uint64_t data , uint32_t ev) noexcept {
ctrl(fd , data ,ev, EPOLL_CTL_MOD) ;
}
这样,epoll_wait 醒过来之后,就可以通过EV_SEND事件来处理发送队列的数据。而sendfd在这里只是扮演了一个给epoll来监听的媒介。真正用到的是epoll_data_t的高32位。
萧然
2018-11-26 00:35