epoll源码分析
epoll_create
epoll的第一个系统调用epoll_create,生成一个文件描述符,创建eventpoll结构,保存到文件的私有数据结构中。
当创建好epoll句柄后,它也会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。
/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
//调用epoll_wait时,需要从ready链表上获取已准备好事件。
//如果没有准备好的事件,则需要堵塞等待,将进程堵塞在wq队列上,
//目标文件有事件发生了,则调用ep_poll_callback将epi(目标文件)挂到ready链表上,
//同时唤醒调用epoll_wait的进程
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
//epoll也提供了poll函数,即可以将epoll添加到poll/select/epoll进行监听,如果有进程调用它的
//poll,会将调用进程加到它的等待队列poll_wait中,以便唤醒调用进程
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
//存放已有事件发生的监听文件
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
//树根,将监听文件存储在epitem结构中,将epitem插入此树
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
//将发生事件的目标文件传递到用户空间时,发生事件的其他文件暂时被挂载ovflist上
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
};
epoll_create的参数size is ignored, but must be greater than zero;
In the initial epoll_create() implementation, the size argument
informed the kernel of the number of file descriptors that the caller
expected to add to the epoll instance. The kernel used this
information as a hint for the amount of space to initially
allocate in internal data structures describing events. (If
necessary, the kernel would allocate more space if the caller's
usage exceeded the hint given in size.) Nowadays, this hint is
no longer required (the kernel dynamically sizes the required
data structures without needing the hint), but size must still be
greater than zero, in order to ensure backward compatibility when
new epoll applications are run on older kernels.
从下面的代码看,参数size虽然用不到,但是必须大于0,这是为了向后兼容,以前的kernel版本是需要这个参数的。
linux/fs/eventpoll.c:
SYSCALL_DEFINE1(epoll_create, int, size)
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
/* File callbacks that implement the eventpoll file behaviour */
static const struct file_operations eventpoll_fops = {
#ifdef CONFIG_PROC_FS
.show_fdinfo = ep_show_fdinfo,
#endif
.release = ep_eventpoll_release,
.poll = ep_eventpoll_poll,
.llseek = noop_llseek,
};
SYSCALL_DEFINE1(epoll_create1, int, flags)
//创建epoll结构体并初始化相关字段
error = ep_alloc(&ep);
user = get_current_user();
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
//将ep赋值给file的私有数据字段
file->private_data = priv;
ep->file = file;
fd_install(fd, file);
return fd;
epoll_ctl
epoll的第二个系统调用epoll_ctl,向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型。
epoll_wait方法返回的事件必然是通过 epoll_ctl添加到 epoll中的。
第一个参数是epoll_create()返回的描述符,
第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,
第四个参数是告诉内核需要监听什么事。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event)
{
//如果不是删除操作,即添加或者修改操作,需要从用户态拷贝事件到内核态
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
//获取epoll文件
f = fdget(epfd);
//获取目标文件
tf = fdget(fd);
//目标文件必须支持poll操作
if (!tf.file->f_op->poll)
goto error_tgt_fput;
//不允许epoll监听自己
error = -EINVAL;
if (f.file == tf.file || !is_file_epoll(f.file))
goto error_tgt_fput;
//在红黑树中查找目标文件是否已经存在
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
//epi为空,说明目标文件不存在,则将目标文件插入到红黑树
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
//如果已经存在,则返回error
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
}
ep_insert用来创建新的epi(代表目标文件),插入红黑树。初始化wait结构,加入目标文件的等待队列,如果有事件发生,则调用ep_poll_callback将发生事件的epi挂载ready链表上。
注意的是,调用init_waitqueue_func_entry初始化wait ,调用此函数的一般不会是打算唤醒进程,而是事件发生时,调用func来做其他事情。
static inline void
init_waitqueue_func_entry(wait_queue_t *q, wait_queue_func_t func)
{
q->flags = 0;
q->private = NULL;
q->func = func;
}
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
//创建epoll的用户最多可以监视max_user_watches个fd
//max_user_watches可以通过/proc/sys/fs/epoll修改
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
//分配epitem结构体,存放监视fd相关信息
epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
/* Wrapper struct used by poll queueing */
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
struct ep_pqueue epq;
epq.epi = epi;
//调用目标文件poll函数时,在poll_wait中调用 ep_ptable_queue_proc,
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
pt->_qproc = qproc;
pt->_key = ~0UL; /* all events enabled */
//获取目标文件事件,同时将调用进程添加到目标文件等待队列中
revents = ep_item_poll(epi, &epq.pt);
pt->_key = epi->event.events;
//调用被监视fd的poll函数。
//对于tcp socket来说,就是调用tcp_poll(见下文解释)
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
//将epi插入到红黑树
ep_rbtree_insert(ep, epi);
//如果当前目标文件有事件发生,并且包含目标文件的epi还没加入到ready链表,则将epi添加到ready链表中
if ((revents & event->events) && !ep_is_linked(&epi->rdllink))
{
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
//增加监听个数
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
}
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
//调用ep_ptable_queue_proc将等待队列添加到sk的等待队列头上
sock_poll_wait(file, sk_sleep(sk), wait);
//返回当前发生的事件(判断全连接队列是否为空),如果为空则返回0,
//如果不为空,则返回(POLLIN | POLLRDNORM)
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
}
static inline void sock_poll_wait(struct file *filp,wait_queue_head_t *wait_address, poll_table *p)
{
if (!poll_does_not_wait(p) && wait_address) {
poll_wait(filp, wait_address, p);
//调用 ep_ptable_queue_proc
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
}
}
/* Wait structure used by the poll hooks */
//连接目标文件等待队列和epitem
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
//链接到epi->pwqlist
struct list_head llink;
/* The "base" pointer is set to the container "struct epitem" */
//指向epitem
struct epitem *base;
/*
* Wait queue item that will be linked to the target file wait
* queue head.
*/
//等待队列节点,会被添加到whead中
wait_queue_t wait;
/* The wait queue head that linked the "wait" wait queue item */
//存放目标文件的等待队列
wait_queue_head_t *whead;
};
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
//目标文件有事件发生时,会调用函数ep_poll_callback,此函数会唤醒调用epoll_wait而
//堵塞的进程,而至于发生了什么事件,则会在调用进程中再次调用目标文件的poll来获取。
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
q->flags = 0;
q->private = NULL;
q->func = func;
pwq->whead = whead;
pwq->base = epi;
//将pwq->wait添加到目标文件的等待队列whead上,有事件发生时调用ep_poll_callback
add_wait_queue(whead, &pwq->wait);
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
目标文件有事件发生时,调用函数ep_poll_callback,将发生事件的epi添加到readly链表中,并且唤醒调用epoll_wait堵塞的进程来获取事件,通知用户空间程序去处理目标文件的事件。
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
spin_lock_irqsave(&ep->lock, flags);
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
//如果用户设置了EPOLLONESHOT ,则会忽略所有发生的事件,直到下一次调用EPOLL_CTL_MOD
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
//如果没有用户感兴趣的事件发生,则直接返回
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
//如果正在将之前发生的事件传递到用户程序,则将新事件添加到ovflist链表中。
//然后goto跳转出去,不会再添加到rdlist上
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
/*
* Activate ep->ws since epi->ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}
//将epi挂在ready链表
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
//如果ep句柄上有调用epoll_wait而堵塞等待进程,则唤醒。
//因为调用epoll_wait,添加到等待队列时,调用了exclusive,并且添加到等待队列头部,
//所以只会唤醒一个进程(类似多进程下accept的处理)
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
}
epoll_wait
epoll的第三个系统调用,用来获取发生事件的fd。
a. 参数epfd是epoll_create返回的描述符,
b. 参数events是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高),
c. 参数maxevents表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的,
d. 参数timeout表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待。
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout)
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Get the "struct file *" for the eventpoll file */
f = fdget(epfd);
if (!f.file)
return -EBADF;
error = -EINVAL;
//必须是epoll文件
if (!is_file_epoll(f.file))
goto error_fput;
//获取epoll结构体
ep = f.file->private_data;
/* Time to fish for events ... */
//获取准备好的事件,传递到用户空间
error = ep_poll(ep, events, maxevents, timeout);
将readly链表上已就绪的fd传递到用户空间,如果没有就绪事件,则根据timeout决定堵塞或者返回
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
if (timeout > 0) {
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
//如果timeout为0,则跳转到check_events检查当前是否有事件发生,没有则直接返回
//如果有事件发生,则将事件发送到用户空间
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
//如果没有就绪事件,则堵塞等待事件发生
if (!ep_events_available(ep)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
//初始化等待节点,将当前进程加入等待节点
wait_queue_t wait;
init_waitqueue_entry(&wait, current);
q->flags = 0;
q->private = p;
q->func = default_wake_function;
//将等待节点加入ep->wq等待队列头部。而在accept流程中是将等待节点加入到等待队列尾部。
//在早期linux版本中,内核对于堵塞在epoll_wait的进程,也是全部唤醒的机制,所以存在和
//accept相似的“惊群”问题。新版本的解决方案也是只会唤醒等待队列上的第一个进程。所以
//新版本linux已经部分解决了epoll的“惊群”问题。所谓部分解决只是在LT模式下存在,ET模式下是不存在的。
__add_wait_queue_exclusive(&ep->wq, &wait);
wait->flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(q, wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
!list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
//如果readly链表不为空,说明有事件发生,则调用ep_send_events传递到用户空间
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
将ready链表上的已就绪事件传递到用户空间
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}
static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *,
struct list_head *, void *), void *priv, int depth, bool ep_locked)
{
LIST_HEAD(txlist);
spin_lock_irqsave(&ep->lock, flags);
//将rdlist赋值到txlist
list_splice_init(&ep->rdllist, &txlist);
//将ovslist置为空,以便在ep_poll_callback将又发生的事件临时存放在ovflist中
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
//调用ep_send_events_proc将发生的事件传递到用户程序
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
//在调用ep_send_events_proc将发生的事件传送到用户空间过程时,如果目标文件有事件发生,
//则在ep_poll_callback函数中将epi暂时存放在ep->ovflist链表中。
//此处将它们从ovflist取下,放入rdlink链表
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
ep->ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on "txlist".
*/
list_splice(&txlist, &ep->rdllist);
__pm_relax(ep->ws);
if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!ep_locked)
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
调用函数ep_send_events_proc将发生事件的fd传递到用户空间,
同时根据设置的不同模式进行不同的处理,比如如果是LT模式,则无论如何都会将epi重新加回到“就绪链表”,等待下次重新再poll以
确认是否仍然有未处理的事件。
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
struct ep_send_events_data *esed = priv;
//最多传递esed->maxevents个fd事件
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);
//将epi从ready链表上删除
list_del_init(&epi->rdllink);
//初始化空的pt,只是为了作为参数而已
init_poll_funcptr(&pt, NULL);
//调用目标文件的poll函数,获取发生的事件
revents = ep_item_poll(epi, &pt);
if (revents) {
//将发生的事件放入用户空间
//如果没有完成,则将epi重新加到“就绪链表”等待下次继续传送。
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
//epi->event.events如果设置了EPOLLONESHOT,则将EP_PRIVATE_BITS赋值给
//epi->event.events,表示只调用一次,即使有了新事件也不会再次通知到用户程序(在ep_poll_callback中进行判断)
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
// 如果是LT模式,则无论如何都会将epi重新加回到“就绪链表”,等待下次重新再poll以
//确认是否仍然有未处理的事件。这也符合“水平触发”的逻辑,即“只要你不处理,我就会一直通知你”。
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
}
}