linux epoll 水平触发LT和边缘出发ET


参考:

epoll、accept触发模式及阻塞方式的选择

ET or LT

select(),poll()模型都是水平触发模式,信号驱动IO是边缘触发模式,epoll()模型即支持水平触发,也支持边缘触发,默认是水平触发
从表象看epoll的性能最好,但是在连接数少,并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多回调函数来完成。

epoll工作在两种触发模式下:
Level_triggered(水平触发): 这是epoll默认的触发方式,既支持阻塞模式,也支持非阻塞模式,当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上次没读写完的文件描述符上继续读写

Edge_triggered(边缘触发): 这种模式下,epoll只支持非阻塞模式,当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。Nginx默认采用ET模式来使用epoll。

二者的差异在于level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只要监测描述符上有数据,epoll_wait就会返回该socket。
所以,在epoll的ET模式下,正确的读写方式为:

读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

这里的错误码是指:
在一个非阻塞的socket上调用read/write函数, 返回的errno为EAGAIN或者EWOULDBLOCK
这个错误表示资源暂时不够,read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1, 同时errno设置为EAGAIN。
简单代码示例如下:
读:

1
2
3
4
5
6
7
n = 0;
while ((nread = read(fd, buf + n, BUFSIZ - 1)) > 0) {
n += nread;
}
if (nread == -1 && errno != EAGAIN) {
perror("read error");
}

写:

1
2
3
4
5
6
7
8
9
10
11
12
13
int nwrite,
data_size = strlen(buf);
n = data_size;
while (n > 0) {
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n) {
if (nwrite == -1 && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}

从以上我们就看出来了为何 epoll在ET模式下使用的是非阻塞fd ,由于边缘触发的模式,每次epoll_wait返回就绪的fd,必须读完读取缓冲区里的所有数据(直至接收数据返回EAGAIN),必须套上while循环此时若使用阻塞的fd,当读取完缓冲区里的数据后,接受数据过程会阻塞,从而无法接受新的连接。

1
**//////////////////////割割割割///////////////////////////////割割割割////////////////////////**

上面介绍完了epoll的两种触发模式以及两种阻塞方式的使用,下面分析一下网络编程中accept函数使用fd时要如何选择触发方式以及阻塞方式。

listenfd阻塞还是非阻塞?

如果TCP连接被客户端夭折,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。
解决办法是把监听套接口listenfd设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epool,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。
ET还是LT?
ET:如果多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。
解决办法是用while循环包住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。

LT:在nigix的实现中,accept函数调用使用水平触发的fd,就是出于对丢失连接的考虑(边缘触发时,accept只会执行一次接收一个连接,内核不会再去通知有连接就绪),所以使用水平触发的fd就不存在丢失连接的问题。但是如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
所以服务器应该使用非阻塞的accept,并且在使用ET模式代码如下:

1
2
3
4
5
6
while ((conn_sock = accept(listenfd, (struct sockaddr * ) & remote, (size_t * ) & addrlen)) > 0) {
handle_client(conn_sock);
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR) perror("accept");
}

一道腾讯后台开发的面试题:
使用Linuxepoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?
解答:正如我们上面说的,LT模式下不需要读写的文件描述符仍会不停地返回就绪,这样就会影响我们监测需要关心的文件描述符的效率。
所以这题的解决方法就是:平时不要把该描述符放进eventpoll结构体中,当需要写该fd的时候,调用epoll_ctl把fd加入eventpoll里监听,可写的时候就往里写,写完再次调用epoll_ctl把fd移出eventpoll,这种方法在发送很少数据的时候仍要执行两次epoll_ctl操作,有一定的操作代价
改进一下就是:平时不要把该描述符放进eventpoll结构体中,需要写的时候调用write或者send写数据,如果返回值是EAGAIN(写缓冲区满了),那么这时候才执行第一种方法的步骤。
归纳如下:
1.对于监听的sockfd要设置成非阻塞类型,触发模式最好使用水平触发模式,边缘触发模式会导致高并发情况下,有的客户端会连接不上。如果非要使用边缘触发,网上有的方案是用while来循环accept()。
2.对于读写的connfd,水平触发模式下,阻塞和非阻塞效果都一样,不过为了防止特殊情况,还是建议设置非阻塞。
3.对于读写的connfd,边缘触发模式下,必须使用非阻塞IO,并要一次性全部读写完数据。、
下面看一个在边沿触发模式下使用epoll的http服务器代码,必要的讲解都在注释里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <fcntl.h>
#include <errno.h>

#define MAX_EVENTS 10
#define PORT 8080

//设置socket连接为非阻塞模式
void setnonblocking(int sockfd)
{
int opts;

opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}

int main()
{
struct epoll_event ev, events[MAX_EVENTS];
int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
struct sockaddr_in local, remote;
char buf[BUFSIZ];

//创建listen socket
{
perror("sockfd\n");
exit(1);
}
setnonblocking(listenfd);
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_addr.s_addr = htonl(INADDR_ANY);;
local.sin_port = htons(PORT);
if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0)
{
perror("bind\n");
exit(1);
}
listen(listenfd, 20);//设置为监听套接字

epfd = epoll_create(MAX_EVENTS);
{
perror("epoll_create");
exit(EXIT_FAILURE);
}

ev.events = EPOLLIN;
ev.data.fd = listenfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1)
{
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}

for (;;)
{
nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);//超时时间-1,永久阻塞直到有事件发生
if (nfds == -1)
{
perror("epoll_pwait");
exit(EXIT_FAILURE);
}

for (i = 0; i < nfds; ++i)
{
fd = events[i].data.fd;

if (fd == listenfd) //如果是监听的listenfd,那就是连接来了,保存来的所有连接
{
//每次处理一个连接,while循环直到处理完所有的连接
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
(size_t *)&addrlen)) > 0)
{
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;//边沿触发非阻塞模式
ev.data.fd = conn_sock;
//把连接socket加入监听结构体
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: add");
exit(EXIT_FAILURE);
}
}
//已经处理完所有的连:accept返回-1,errno为EAGAIN
//出错:返回-1,errno另有其值
if (conn_sock == -1)
{
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}
continue;//直接开始下一次循环,也就是不执行这次循环后面的部分了
}
if (events[i].events & EPOLLIN) //可读事件
{
n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0)
{
n += nread;
}
if (nread == -1 && errno != EAGAIN)
{
perror("read error");
}
ev.data.fd = fd;
ev.events = events[i].events | EPOLLOUT;
//修改该fd监听事件类型,监测是否可写
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1)
{
perror("epoll_ctl: mod");
}
}
if (events[i].events & EPOLLOUT) //可写事件
{
sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11);
int nwrite, data_size = strlen(buf);
n = data_size;
while (n > 0)
{
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n)
{
if (nwrite == -1 && errno != EAGAIN)
{
perror("write error");
}
break;
}
n -= nwrite;
}
//写完就关闭该连接socket
close(fd);
}
}
}

return 0;
}

epoll LT/ET 深入剖析

EPOLL事件有两种模型:

Level Triggered (LT) 水平触发
.socket接收缓冲区不为空 有数据可读 读事件一直触发
.socket发送缓冲区不满 可以继续写入数据 写事件一直触发
符合思维习惯,epoll_wait返回的事件就是socket的状态

Edge Triggered (ET) 边沿触发
.socket的接收缓冲区状态变化时触发读事件,即空的接收缓冲区刚接收到数据时触发读事件
.socket的发送缓冲区状态变化时触发写事件,即满的缓冲区刚空出空间时触发读事件
仅在状态变化时触发事件

ET还是LT?

LT的处理过程:
. accept一个连接,添加到epoll中监听EPOLLIN事件
. 当EPOLLIN事件到达时,read fd中的数据并处理
. 当需要写出数据时,把数据write到fd中;如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT事件
. 当EPOLLOUT事件到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT事件

ET的处理过程:
. accept一个一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
. 当EPOLLIN事件到达时,read fd中的数据并处理,read需要一直读,直到返回EAGAIN为止
. 当需要写出数据时,把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
. 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据全部写完,或者write返回EAGAIN

从ET的处理过程中可以看到,ET的要求是需要一直读写,直到返回EAGAIN,否则就会遗漏事件。而LT的处理过程中,直到返回EAGAIN不是硬性要求,但通常的处理过程都会读写直到返回EAGAIN,但LT比ET多了一个开关EPOLLOUT事件的步骤

LT的编程与poll/select接近,符合一直以来的习惯,不易出错
ET的编程可以做到更加简洁,某些场景下更加高效,但另一方面容易遗漏事件,容易产生bug

这里有两个简单的例子演示了LT与ET的用法(其中epoll-et的代码比epoll要少10行):
https://github.com/yedf/handy/blob/master/raw-examples/epoll.cc
https://github.com/yedf/handy/blob/master/raw-examples/epoll-et.cc

针对容易触发LT开关EPOLLOUT事件的情景(让服务器返回1M大小的数据),我用ab做了性能测试
测试的结果显示ET的性能稍好,详情如下:
LT 启动命令 ./epoll a
ET 启动命令 ./epoll-et a
ab 命令:ab -n 1000 -k 127.0.0.1/
LT 结果:Requests per second: 42.56 [#/sec] (mean)
ET 结果:Requests per second: 48.55 [#/sec] (mean)

当我把服务器返回的数据大小改为48576时,开关EPOLLOUT更加频繁,性能的差异更大
ab 命令:ab -n 5000 -k 127.0.0.1/
LT 结果:Requests per second: 745.30 [#/sec] (mean)
ET 结果:Requests per second: 927.56 [#/sec] (mean)

对于nginx这种高性能服务器,ET模式是很好的,而其他的通用网络库,更多是使用LT,避免使用的过程中出现bug

epoll的LT和ET的区别

LT:水平触发,效率会低于ET触发,尤其在大并发,大流量的情况下。但是LT对代码编写要求比较低,不容易出现问题。LT模式服务编写上的表现是:只要有数据没有被获取,内核就不断通知你,因此不用担心事件丢失的情况。

ET:边缘触发,效率非常高,在并发,大流量的情况下,会比LT少很多epoll的系统调用,因此效率高。但是对编程要求高,需要细致的处理每个请求,否则容易发生丢失事件的情况。

下面举一个列子来说明LT和ET的区别(都是非阻塞模式,阻塞就不说了,效率太低):
采用LT模式下, 如果accept调用有返回就可以马上建立当前这个连接了,再epoll_wait等待下次通知,和select一样。

但是对于ET而言,如果accpet调用有返回,除了建立当前这个连接外,不能马上就epoll_wait还需要继续循环accpet,直到返回-1,且errno==EAGAIN

epoll模型accept并发问题

服务端使用的是epoll网络模型。在测试的时候发现,单用户的情况下客户端和服务器通信正常。但是在多用户并发的情况下,客户端和服务端通信不正常。此时,客户端能正常的链接,发送数据,但是一直卡在接收数据部分

出现这种问题,是因为不正确的使用了epoll中的ET(edge-trigger)模式。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**************************************************
函数名:acceptConn
功能:接受客户端的链接
参数:srvfd:监听SOCKET
***************************************************/
void acceptConn(int srvfd) {
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
bzero( & sin, len);

int confd = accept(srvfd, (struct sockaddr * ) & sin, &len);

if (confd < 0) {
printf("%s: bad accept\n");
return;
} else {
printf("Accept Connection: %d", confd);
}

setNonblocking(confd);

//将新建立的连接添加到EPOLL的监听中
struct epoll_event event;
event.data.fd = confd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
}

注意倒数第二行:event.events = EPOLLIN|EPOLLET; 采用的是ET模式。下面我们来具体说下,问题出在那里。

在epoll中有两种模式:level-trigger模式,简称LT模式,和edge-trigger模式,简称ET模式。其中,LT是默认的工作模式。

这两种模式的工作方式有些不同。在level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

在ET模式socket非阻塞的情况下(上面代码中就是这种情况),多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。因此,就出现了上面所提及的问题。

解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。

修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**************************************************
函数名:acceptConn
功能:接受客户端的链接
参数:srvfd:监听SOCKET
***************************************************/
void acceptConn(int srvfd) {
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
bzero( & sin, len);
int confd = 0;
while ((confd = accept(srvfd, (struct sockaddr * ) & sin, &len)) > 0) {
printf("Accept Connection: %d", confd);

setNonblocking(confd);

//将新建立的连接添加到EPOLL的监听中
struct epoll_event event;
event.data.fd = confd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
}
if (confd == -1) {
if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR) {
printf("%s: bad accept\n");
return;
}
}
}

同理,接收数据和发送数据时如果是ET模式,且非阻塞,也得用循环。

读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN

写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

正确的读:

1
2
3
4
5
6
7
n = 0;
while ((nread = read(fd, buf + n, BUFSIZ - 1)) > 0) {
n += nread;
}
if (nread == -1 && errno != EAGAIN) {
perror("read error");
}

正确的写:

1
2
3
4
5
6
7
8
9
10
11
12
13
int nwrite,
data_size = strlen(buf);
n = data_size;
while (n > 0) {
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n) {
if (nwrite == -1 && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}

epoll中accept的使用细节

accept 要考虑 2 个问题
(1) 阻塞模式 accept 存在的问题
考虑这种情况:TCP连接被客户端夭折,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,
直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。

解决办法: 把监听套接口设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epoll,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。

(2)ET模式下accept存在的问题
考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。

解决办法: 用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。综合以上两种情况,服务器应该使用非阻塞地accept,accept在ET模式下的正确使用方式为:

1
2
3
4
5
6
7
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) {
handle_client(conn_sock);
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
perror("accept");
}

netty为什么选择使用LT

epoll对文件描述符有两种操作模式–LT(level trigger水平模式)和ET(edge trigger边缘模式)

简单来讲,LT是epoll的默认操作模式,当epoll_wait函数检测到有事件发生并将通知应用程序,而应用程序不一定必须立即进行处理,这样epoll_wait函数再次检测到此事件的时候还会通知应用程序,直到事件被处理。

而ET模式,只要epoll_wait函数检测到事件发生,通知应用程序立即进行处理,后续的epoll_wait函数将不再检测此事件。因此ET模式在很大程度上降低了同一个事件被epoll触发的次数,因此效率比LT模式高。

解释为什么epoll默认是LT的原因(超哥解释,个人觉得还是非常不错的)LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

io.netty:netty-transport-native-epoll:linux-x86_64:4.1.49.Final

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public enum EpollMode {
// Use {@code EPOLLET} (edge-triggered).
EDGE_TRIGGERED,
// Do not use {@code EPOLLET} (level-triggered).
LEVEL_TRIGGERED
}
final class NativeStaticallyReferencedJniMethods {
static native int epollin();
static native int epollout();
static native int epollrdhup();
static native int epollet();
static native int epollerr();
static native long ssizeMax();
static native int tcpMd5SigMaxKeyLen();
static native int iovMax();
static native int uioMaxIov();
static native boolean isSupportingSendmmsg();
static native boolean isSupportingRecvmmsg();
static native boolean isSupportingTcpFastopen();
static native String kernelVersion();
}
public final class Native {
static {
public static final int EPOLLET = epollet();
}
}
public class EpollChannelConfig extends DefaultChannelConfig {
public EpollMode getEpollMode() {
return ((AbstractEpollChannel) channel).isFlagSet(Native.EPOLLET)
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
}

public EpollChannelConfig setEpollMode(EpollMode mode) {
ObjectUtil.checkNotNull(mode, "mode");

try {
switch (mode) {
case EDGE_TRIGGERED:
checkChannelNotRegistered();
((AbstractEpollChannel) channel).setFlag(Native.EPOLLET);
break;
case LEVEL_TRIGGERED:
checkChannelNotRegistered();
((AbstractEpollChannel) channel).clearFlag(Native.EPOLLET);
break;
default:
throw new Error();
}
} catch (IOException e) {
throw new ChannelException(e);
}
return this;
}
}

class EpollEventLoop extends SingleThreadEventLoop {
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
if (maxEvents == 0) {
allowGrowing = true;
events = new EpollEventArray(4096);
} else {
allowGrowing = false;
events = new EpollEventArray(maxEvents);
}
boolean success = false;
FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
FileDescriptor timerFd = null;
try {
this.epollFd = epollFd = Native.newEpollCreate();
this.eventFd = eventFd = Native.newEventFd();
try {
// It is important to use EPOLLET here as we only want to get the notification once per
// wakeup and don't call eventfd_read(...).
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
} catch (IOException e) {
throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
}
this.timerFd = timerFd = Native.newTimerFd();
try {
// It is important to use EPOLLET here as we only want to get the notification once per
// wakeup and don't call read(...).
Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
} catch (IOException e) {
throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
}
success = true;
} finally {
if (!success) {
if (epollFd != null) {
try {
epollFd.close();
} catch (Exception e) {
// ignore
}
}
if (eventFd != null) {
try {
eventFd.close();
} catch (Exception e) {
// ignore
}
}
if (timerFd != null) {
try {
timerFd.close();
} catch (Exception e) {
// ignore
}
}
}
}
}
}