理解epoll(基于linux2.6.12.1)


epoll在现在的软件中占据了很大的分量,nginx,libuv等单线程事件循环的软件都使用了epoll。之前分析过select,今天分析一下epoll。

简介

  • epoll与select
  • epoll_create
  • epoll_ctl
  • epoll_wait
  • ET、LT模式
1
#include <sys/epoll.h>

epoll与select

  • Epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目
  • 效率提升,epoll对于句柄事件的选择不是遍历的,是事件响应的,就是句柄上事件来就马上选择出来,不需要遍历整个句柄链表,因此效率非常高,内核将句柄用红黑树保存的,IO效率不随FD数目增加而线性下降。
  • 内存拷贝, select让内核把 FD 消息通知给用户空间的时候使用了内存拷贝的方式,开销较大,但是Epoll 在这点上使用了共享内存的方式,这个内存拷贝也省略了。

相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。

并且,在linux/posix_types.h头文件有这样的声明:

#define __FD_SETSIZE 1024

表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。

epoll_create

1
int epoll_create(int size);

创建一个epoll的句柄,

  • size用来告诉内核这个监听的数目一共有多大。

这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

epoll_ctl

1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型

  • EPOLL_CTL_ADD 注册新的fd到epfd中;
  • EPOLL_CTL_MOD 修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL 从epfd中删除一个fd;

fd 是要监听的fd

event 是要监听什么样的事件

1
2
3
4
5
6
7
8
9
10
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

epoll_wait

1
2
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

ET、LT两种工作模式

  • EPOLLLT:完全靠Linux-kernel-epoll驱动,应用程序只需要处理从epoll_wait返回的fds, 这些fds我们认为它们处于就绪状态。此时epoll可以认为是更快速的poll。
  • EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息(从epoll队列移除), 直到应用程序通过读写操作(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。 随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势,但是对程序员的要求也更高。

举例

假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发ET,那么这个fd变成就绪状态就会从epoll 队列移除,则epoll_wait 会一直阻塞,忽略尚未读取的1k数据; 而如果是水平触发LT,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。

总结: LT模式可能触发的次数更多, 一旦触发的次数多, 也就意味着效率会下降; 但这样也不能就说LT模式就比ET模式效率更低, 因为ET的使用对编程人员提出了更高更精细的要求,一旦使用者编程水平不够, 那ET模式还不如LT模式。

ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;而LT模式是只要有数据没有处理就会一直通知下去的.

epoll IO多路复用模型实现机制

设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?

在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分:

  • 调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
  • 调用epoll_ctl向epoll对象中添加这100万个连接的套接字
  • 调用epoll_wait收集发生的事件的连接

只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。

Linux内核具体的epoll机制实现思路。

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*
* This structure is stored inside the "private_data" member of the file
* structure and rapresent the main data sructure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the this structure access */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdllist;
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
/* RB tree root used to store monitored fd structs */
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transfering ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
};

每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。

而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。

在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
*/
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
//红黑树节点
struct rb_node rbn;
/* List header used to link this structure to the eventpoll ready list */
//双向链表节点
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
//事件句柄信息
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
//指向其所属的eventpoll对象
struct
![Uploading EPOLL_663944.jpg . . .]
eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* The structure that describe the interested events and the source fd */
//期待发生的事件类型
struct epoll_event event;
};

当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

img

通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。

下面我们按照epoll三部曲的顺序进行分析。

epoll_create

1
2
3
4
5
6
7
8
9
10
11
12
asmlinkage long sys_epoll_create(int size)
{
int error, fd;
struct inode *inode;
struct file *file;

error = ep_getfd(&fd, &inode, &file);
error = ep_file_init(file);

return fd;

}

我们发现create函数似乎很简单。

1、操作系统中,进程和文件系统是通过fd=>file=>node联系起来的。ep_getfd就是在建立这个联系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int ep_getfd(int *efd, struct inode **einode, struct file **efile)
{

// 获取一个file结构体
file = get_empty_filp();
// epoll在底层本身对应一个文件系统,从这个文件系统中获取一个inode
inode = ep_eventpoll_inode();
// 获取一个文件描述符
fd = get_unused_fd();

sprintf(name, "[%lu]", inode->i_ino);
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino;
// 申请一个entry
dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this);
dentry->d_op = &eventpollfs_dentry_operations;
file->f_dentry = dentry;

// 建立file和inode的联系
d_add(dentry, inode);
// 建立fd=>file的关联
fd_install(fd, file);

*efd = fd;
*einode = inode;
*efile = file;
return 0;
}

形成一个这种的结构。

2、通过ep_file_init建立file和epoll的关联。

1
2
3
4
5
6
7
8
9
10
11
static int ep_file_init(struct file *file)
{
struct eventpoll *ep;

ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)
memset(ep, 0, sizeof(*ep));
// 一系列初始化
file->private_data = ep;

return 0;
}

epoll_create函数主要是建立一个数据结构。并返回一个文件描述符供后面使用。

epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;

error = -EFAULT;
// 不是删除操作则复制用户数据到内核
if (
EP_OP_HASH_EVENT(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event))
)
goto eexit_1;

// 根据一种的图,拿到epoll对应的file结构体
file = fget(epfd);

// 拿到操作的文件的file结构体
tfile = fget(fd);
// 通过file拿到epoll_event结构体,见上面的图
ep = file->private_data;
// 看这个文件描述符是否已经存在,epoll用红黑树维护这个数据
epi = ep_find(ep, tfile, fd);

switch (op) {
// 新增
case EPOLL_CTL_ADD:
// 还没有则新增,有则报错
if (!epi) {
epds.events |= POLLERR | POLLHUP;
// 插入红黑树
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
// 删除
case EPOLL_CTL_DEL:
// 存在则删除,否则报错
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
// 修改
case EPOLL_CTL_MOD:
// 存在则修改,否则报错
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
}

epoll_ctl函数看起来也没有很复杂,就是根据用户传进来的信息去操作红黑树。对于红黑树的增删改查,查和删除就不分析了。就是去操作红黑树。增和改是类似的逻辑,所以我们只分析增操作就可以了。在此之前,我们先了解一些epoll中其他的数据结构。

当我们新增一个需要监听的文件描述符的时候,系统会申请一个epitem去表示。epitem是保存了文件描述符、事件等信息的结构体。然后把epitem插入到eventpoll结构体维护的红黑树中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;

// 申请一个epitem
epi = EPI_MEM_ALLOC()
// 省略一系列初始化工作
// 记录所属的epoll
epi->ep = ep;
// 在epitem中保存文件描述符fd和file
EP_SET_FFD(&epi->ffd, tfile, fd);
// 监听的事件
epi->event = *event;
epi->nwait = 0;

epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);

// 把epitem插入红黑树
ep_rbtree_insert(ep, epi);

// 如果监听的事件在新增的时候就已经触发,则直接插入到epoll就绪队列
if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) {
// 把epitem插入就绪队列rdllist
list_add_tail(&epi->rdllink, &ep->rdllist);
// 有事件触发,唤醒阻塞在epoll_wait的进程队列
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
}

新增操作的大致流程是:

  1. 申请了一个新的epitem表示待观察的实体。他保存了文件描述符、感兴趣的事件等信息。
  2. 插入红黑树
  3. 判断新增的节点中对应的文件描述符和事件是否已经触发了,是则加入到就绪队列(由eventpoll->rdllist维护的一个队列)

下面具体看一下如何判断感兴趣的事件在对应的文件描述符中是否已经触发。相关代码在ep_insert中。下面单独拎出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
struct ep_pqueue {
// 函数指针
poll_table pt;
// epitem
struct epitem *epi;
};
*/
struct ep_pqueue epq;
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
}

上面的代码是定义了一个struct ep_pqueue 结构体,然后设置他的一个字段为ep_ptable_queue_proc。然后执行tfile->f_op->poll。poll函数由各个文件系统或者网络协议实现。我们以管道为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static unsigned int
pipe_poll(struct file *filp, poll_table *wait)
{
unsigned int mask;
// 监听的文件描述符对应的inode
struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info = inode->i_pipe;
int nrbufs;
/*
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
*/
poll_wait(filp, PIPE_WAIT(*inode), wait);

// 判断哪些事件触发了
nrbufs = info->nrbufs;
mask = 0;
if (filp->f_mode & FMODE_READ) {
mask = (nrbufs > 0) ? POLLIN | POLLRDNORM : 0;
if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
mask |= POLLHUP;
}

if (filp->f_mode & FMODE_WRITE) {
mask |= (nrbufs < PIPE_BUFFERS) ? POLLOUT | POLLWRNORM : 0;
if (!PIPE_READERS(*inode))
mask |= POLLERR;
}

return mask;
}

我们看到具体的poll函数里会首先执行poll_wait函数。这个函数只是简单执行struct ep_pqueue epq结构体中的函数,即刚才设置的ep_ptable_queue_proc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//  监听的文件描述符对应的file结构体,whead是等待监听的文件描述符对应的inode可用的队列
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) {
pwq->wait->flags = 0;
pwq->wait->task = NULL;
// 设置回调
pwq->wait->func = ep_poll_callback;
pwq->whead = whead;
pwq->base = epi;
// 插入等待监听的文件描述符的inode可用的队列,回调函数是ep_poll_callback
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

主要的逻辑是把当前进程插入监听的文件的等待队列中,等待唤醒。

epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
// 通过epoll的fd拿到对应的file结构体
file = fget(epfd);
// 通过file结构体拿到eventpoll结构体
ep = file->private_data;
error = ep_poll(ep, events, maxevents, timeout);
return error;
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;

// 计算超时时间
jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;

retry:

res = 0;
// 就绪队列为空
if (list_empty(&ep->rdllist)) {
// 加入阻塞队列
init_waitqueue_entry(&wait, current);
add_wait_queue(&ep->wq, &wait);

for (;;) {
// 挂起
set_current_state(TASK_INTERRUPTIBLE);
// 超时或者有就绪事件了,则跳出返回
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
// 被信号唤醒返回EINTR
if (signal_pending(current)) {
res = -EINTR;
break;
}

// 设置定时器,然后进程挂起,等待超时唤醒(超时或者信号唤醒)
jtimeout = schedule_timeout(jtimeout);
}
// 移出阻塞队列
remove_wait_queue(&ep->wq, &wait);
// 设置就绪
set_current_state(TASK_RUNNING);
}

// 是否有事件就绪,唤醒的原因有几个,被唤醒不代表就有就绪事件
eavail = !list_empty(&ep->rdllist);

write_unlock_irqrestore(&ep->lock, flags);
// 处理就绪事件返回
if (!res && eavail &&
!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
goto retry;

return res;
}

总的来说epoll_wait的逻辑主要是处理就绪队列的节点。

  1. 如果就绪队列为空,则根据timeout做下一步处理,可能定时阻塞。
  2. 如果就绪队列非空则处理就绪队列,返回给用户。处理就绪队列的函数是ep_events_transfer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int ep_events_transfer(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
int eventcnt = 0;
struct list_head txlist;

INIT_LIST_HEAD(&txlist);

if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
eventcnt = ep_send_events(ep, &txlist, events);
ep_reinject_items(ep, &txlist);
}

return eventcnt;
}

主要是三个函数,我们一个个看。

1、ep_collect_ready_items收集就绪事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static int ep_collect_ready_items(struct eventpoll *ep, struct list_head *txlist, int maxevents)
{
int nepi;
unsigned long flags;
// 就绪事件的队列
struct list_head *lsthead = &ep->rdllist, *lnk;
struct epitem *epi;

for (nepi = 0, lnk = lsthead->next; lnk != lsthead && nepi < maxevents;) {
// 通过结构体字段的地址拿到结构体首地址
epi = list_entry(lnk, struct epitem, rdllink);

lnk = lnk->next;

/* If this file is already in the ready list we exit soon */
if (!EP_IS_LINKED(&epi->txlink)) {

epi->revents = epi->event.events;
// 插入txlist队列,然后处理完再返回给用户
list_add(&epi->txlink, txlist);
nepi++;
// 从就绪队列中删除
EP_LIST_DEL(&epi->rdllink);
}
}

return nepi;
}

2、ep_send_events判断哪些事件触发了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int ep_send_events(struct eventpoll *ep, struct list_head *txlist,
struct epoll_event __user *events)
{
int eventcnt = 0;
unsigned int revents;
struct list_head *lnk;
struct epitem *epi;
// 遍历就绪队列,记录触发的事件
list_for_each(lnk, txlist) {
epi = list_entry(lnk, struct epitem, txlink);
// 判断哪些事件触发了
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);

epi->revents = revents & epi->event.events;
// 复制到用户空间
if (epi->revents) {
if (__put_user(epi->revents,
&events[eventcnt].events) ||
__put_user(epi->event.data,
&events[eventcnt].data))
return -EFAULT;
// 只监听一次,触发完设置成对任何事件都不感兴趣
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
eventcnt++;
}
}
return eventcnt;
}

3、ep_reinject_items重新插入就绪队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
{
int ricnt = 0, pwake = 0;
unsigned long flags;
struct epitem *epi;

while (!list_empty(txlist)) {
epi = list_entry(txlist->next, struct epitem, txlink);
EP_LIST_DEL(&epi->txlink);
// 水平触发模式则一直通知,即重新加入就绪队列
if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&
(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ricnt++;
}
}

}

我们发现,并有没有在epoll_wait的时候去收集就绪事件,那么就绪队列是谁处理的呢?我们回顾一下插入红黑树的时候,做了一个事情,就是在文件对应的inode上注册一个回调。当文件满足条件的时候,就会唤醒因为epoll_wait而阻塞的进程。epoll_wait会收集事件返回给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = EP_ITEM_FROM_WAIT(wait);
struct eventpoll *ep = epi->ep;
// 插入就绪队列
list_add_tail(&epi->rdllink, &ep->rdllist);
// 唤醒因epoll_wait而阻塞的进程
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
return 1;
}

epoll的实现涉及的内容比较多,先分析一下大致的原理。有机会再深入分析。