文章目录
1.前言2.应用层的体现3.两个重要结构(1)eventpoll(2)epitem
4.四个函数(1)epoll_create源码(2)epoll_ctl源码(3)epoll_wait的源码(4)epoll_event_callback()
5.水平触发和边缘触发1.状态变化2.LT模式3.ET模式
6.epoll中的锁7.epoll的致命缺点(1)多线程扩展性场景一:同一个 listen fd 在多个 CPU 上调用 accept 系统调用
(2)epoll 所注册的 fd(file descriptor) 和实际内核中控制的结构 file description 拥有不同的生命周期
1.前言
好久好久没有更新博客了,最近一直在实习,刷算法找工作,忙里偷闲简单研究了一下epoll的源码。也是由于面试的时候经常被问到,我只会说那一套,什么epoll_create创建红黑树,以O(1)的方式去读取数据,它和poll与select的区别等等。本篇将从epoll的源码层面重新学习epoll。
2.应用层的体现
多路转接(epoll)实现我在这篇文章中实现了一个简单的epoll网络server。感兴趣的同学可以简单阅读一下,我只挑其中关键的代码来讲一下应用层是如何使用epoll的:
#include"Sock.hpp"
using namespace ns_Sock;
#define NUM 128
#include
#include
void Usage(char* proc)
{
cout<<"Usage \n\t"< } int main(int argc,char* argv[]) { if(argc!=2) { Usage(argv[0]); exit(-1); } uint16_t port=(uint16_t)atoi(argv[1]); int listen_sock=Sock::Socket(); Sock::Bind(listen_sock,port); Sock::Listen(listen_sock); //建立epoll模型,获得epfd int epfd=epoll_create(128); //先添加listen_sock和它所关心的事件到内核中 struct epoll_event ev; ev.events=EPOLLIN; ev.data.fd=listen_sock;//虽然epoll_ctl有文件描述符,但是revs数组中的元素是epoll_event没有fd,因此需要将fd添加都epoll_event的data字段中 epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&ev); //事件循环 volatile bool quit=false; struct epoll_event revs[NUM];//由于epoll_wait的数组是输出型参数,因此需要接收 while(!quit) { int timeout=1000; int n=epoll_wait(epfd,revs,NUM,-1);//epoll_wait会将epfd中就绪事件的epoll_event结构体放在revs数组中,返回值表示数组大小 switch(n) { case 0: cout<<"timeout....."< break; case -1: cerr<<"epoll error"< break; default: cout<<"有事件就绪了"< //处理就绪事件 for(int i=0;i { int sock=revs[i].data.fd;//暂时方案 cout<<"文件描述符"< if(revs[i].events&EPOLLIN)//读事件就绪 { cout<<"文件描述符"< if(sock==listen_sock) { int fd=Sock::Accept(listen_sock); if(fd>=0) { cout<<"获取新链接成功了"< struct epoll_event _ev; _ev.events=EPOLLIN; _ev.data.fd=fd; epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&_ev); cout<<"已经把"< } } //正常的读处理 else { cout<<"文件描述符"< char buffer[1024]; ssize_t s=read(sock,buffer,sizeof(buffer)-1); if(s>0) { buffer[s]=0; cout<<"client["< } else if(s==0) { cout<<"client quit "< close(sock); epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr);//将该套接字从epoll空间关注的位置删除 cout<<"Sock:"< } else { cout<<"recv error"< close(sock); epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr); cout<<"delete sock"< } } } } } } } 这是我那篇博客的服务器端的代码,使用telnet是可以直接访问的,通过这段代码我们可以发现调用epoll的过程以及一些细节。首先就是众所周知的: epoll_create创建一个epoll空间。接着调用epoll_ctl将一个文件描述符以及对该文件描述符需要关心的事件放进epoll空间中。然后调用epoll_wait进行等待就好了。事件就绪会使用epoll_wait这个函数来通知我们。 但仔细看代码还是会发现一些细节,在epoll空间建立完成后,添加的第一个文件描述符就是listen_sock,并且关心它的读事件。 在epoll_wait成功的时候,会返回一个就绪事件的数组,通过遍历这个数组去对数据进行操作,但当该数组元素表示的是listen_sock事件就绪的时候,需要在listen_sock中将新监听到的链接accept上来。这是对监听套接字独有的一种处理方式,也说明epoll的回调函数不止一个触发条件(数据就绪orl链接就绪)。 其中这里还涉及到了一个重要的结构体epoll_event,这个结构体在内核源码中也经常使用到,下面给出它在linux系统中的结构: typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ } __EPOLL_PACKED; 其中它的events表示的是事件,data又是一个结构体,它其中的一个重要的东西就是fd。它表示该epoll_event是哪个套接字的。 3.两个重要结构 (1)eventpoll 其中一个结构名为eventpoll,当调用epoll_create的时候内核会自动创建它,所以其实所谓的epoll空间仅仅是一个结构体而已。 struct eventpoll { /* struct ep_rb_tree { struct epitem *rbh_root; } */ ep_rb_tree rbr; //rbr指向红黑树的根节点 int rbcnt; //红黑树中节点的数量(也就是添加了多少个TCP连接事件) LIST_HEAD( ,epitem) rdlist; //rdlist指向双向链表的头节点; /* 这个LIST_HEAD等价于 struct { struct epitem *lh_first; }rdlist; */ int rdnum; //双向链表中节点的数量(也就是有多少个TCP连接来事件了) // ...略... }; 它包含了几个内容,有我们熟知的红黑树根节点,还有所谓的就绪队列(是一个双链表),以及红黑树与就绪队列的节点数量。 (2)epitem 这个是比较关键的一个结构体,epoll_ctl将文件描述符以及所关心的事件插入到epoll空间中,插入的就是这么个玩意。 struct epitem { RB_ENTRY(epitem) rbn; /* RB_ENTRY(epitem) rbn等价于 struct { struct type *rbe_left; //指向左子树 struct type *rbe_right; //指向右子树 struct type *rbe_parent; //指向父节点 int rbe_color; //该节点的颜色 } rbn */ LIST_ENTRY(epitem) rdlink; /* LIST_ENTRY(epitem) rdlink等价于 struct { struct type *le_next; //指向下个元素 struct type **le_prev; //前一个元素的地址 }*/ int rdy; //判断该节点是否同时存在与红黑树和双向链表中 int sockfd; //socket句柄 struct epoll_event event; //存放用户填充的事件 }; 首先它包含了一个红黑树节点的结构,其次他又包含了双向链表的结构,到这里我们就可以发现,红黑树和双向链表中插入的是同一个结构体epitem。也很容易想到epoll_ctl的本质其实就是使用传入的参数来构造一个epitem。 因此它包含的参数显而易见:sockfd表示该epitem对应的套接字,epoll_event表示需要关心的该套接字的事件。 4.四个函数 (1)epoll_create源码 int epoll_create(int size) { //size没有什么卵用 if (size <= 0) return -1; //tcp服务,我也不懂,目前不是重点 nty_tcp_manager *tcp = nty_get_tcp_manager(); if (!tcp) return -1; struct _nty_socket *epsocket = nty_socket_allocate(NTY_TCP_SOCK_EPOLL); if (epsocket == NULL) { nty_trace_epoll("malloc failed\n"); return -1; } //1.建立一个eventpoll struct eventpoll *ep = (struct eventpoll*)calloc(1, sizeof(struct eventpoll)); if (!ep) { nty_free_socket(epsocket->id, 0); return -1; } //2.初始化红黑树指针为空,节点数为0 ep->rbcnt = 0; RB_INIT(&ep->rbr); //3.双向链表头指向空 LIST_INIT(&ep->rdlist); //线程操作,暂时不用关心 if (pthread_mutex_init(&ep->mtx, NULL)) { free(ep); nty_free_socket(epsocket->id, 0); return -2; } if (pthread_mutex_init(&ep->cdmtx, NULL)) { pthread_mutex_destroy(&ep->mtx); free(ep); nty_free_socket(epsocket->id, 0); return -2; } if (pthread_cond_init(&ep->cond, NULL)) { pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); free(ep); nty_free_socket(epsocket->id, 0); return -2; } if (pthread_spin_init(&ep->lock, PTHREAD_PROCESS_SHARED)) { pthread_cond_destroy(&ep->cond); pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); free(ep); nty_free_socket(epsocket->id, 0); return -2; } //4.保存ep对象,通过epid可以在系统中找到eventpoll结构 tcp->ep = (void*)ep; epsocket->ep = (void*)ep; return epsocket->id; } 我们发现在epoll_create的时候会传入一个参数,但是这个参数似乎没有意义,从这里就可以很简单的看出了,这个size除了判断一下是不是大于0之外,之后什么都没做。因此这个参数是没有意义的。 它的主要功能我在代码中标注了,大致如下: 1.建立一个eventpoll对象,并为其分配空间。2.将该对象中红黑树根节点指向空,节点个数设为0。3.将该对象中双向链表头节点指向空。4.将eventpoll对象保存起来。以后可以通过epid找到它,并返回这个epid。 (2)epoll_ctl源码 下面来看看epoll_ctl都干了些什么,在讲解这个函数之前,我们还是先回头看一下我们是怎么向它传参的: epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&ev); 这是我们将listen_sock加入到epoll空间中的代码,可以看到epfd用于寻找eventpoll结构体,EPOLL_CTL_ADD表示我是要加入一个epoll节点,listen_sock表示我加入的这个节点的文件描述符是listen_sock,&ev表示我关心的事件(使用地址传参为了节省空间),这一切是不是很明确了呢?下面我们来阅读这部分的源码。 int epoll_ctl(int epid, int op, int sockid, struct epoll_event *event) { //tcp部分,暂时不管 nty_tcp_manager *tcp = nty_get_tcp_manager(); if (!tcp) return -1; nty_trace_epoll(" epoll_ctl --> 1111111:%d, sockid:%d\n", epid, sockid); //1.通过传入进来的epid找到对应的eventpoll结构体 struct _nty_socket *epsocket = tcp->fdtable->sockfds[epid]; //struct _nty_socket *socket = tcp->fdtable->sockfds[sockid]; //nty_trace_epoll(" epoll_ctl --> 1111111:%d, sockid:%d\n", epsocket->id, sockid); if (epsocket->socktype == NTY_TCP_SOCK_UNUSED) { errno = -EBADF; return -1; } if (epsocket->socktype != NTY_TCP_SOCK_EPOLL) { errno = -EINVAL; return -1; } nty_trace_epoll(" epoll_ctl --> eventpoll\n"); struct eventpoll *ep = (struct eventpoll*)epsocket->ep; if (!ep || (!event && op != EPOLL_CTL_DEL)) { errno = -EINVAL; return -1; } //2.判断如果是增加节点 if (op == EPOLL_CTL_ADD) { pthread_mutex_lock(&ep->mtx); //在栈区先创建一个对象,接收sockid struct epitem tmp; tmp.sockfd = sockid; //查找这个节点是否在红黑树中,其实也就是根据epollfd查找的,这查找传入tmp,可能表示的是红黑树节点类型,这是一个小细节 struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { nty_trace_epoll("rbtree is exist\n"); pthread_mutex_unlock(&ep->mtx); return -1; } //如果不存在才为结构体分配完整空间 epi = (struct epitem*)calloc(1, sizeof(struct epitem)); if (!epi) { pthread_mutex_unlock(&ep->mtx); errno = -ENOMEM; return -1; } //使用参数构建这个epitem节点 epi->sockfd = sockid; memcpy(&epi->event, event, sizeof(struct epoll_event)); epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi); assert(epi == NULL); ep->rbcnt ++; pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_DEL) { pthread_mutex_lock(&ep->mtx); struct epitem tmp; //先把fd分配给sockfd tmp.sockfd = sockid; //查找该节点是否已经存在,这也说明是根据sockfd查找的 struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { nty_trace_epoll("rbtree no exist\n"); pthread_mutex_unlock(&ep->mtx); return -1; } //存在则删除即可 epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi); if (!epi) { nty_trace_epoll("rbtree is no exist\n"); pthread_mutex_unlock(&ep->mtx); return -1; } ep->rbcnt --; free(epi); pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_MOD) { //修改该节点 struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); //找到了就将该节点的事件进行修改,这里还有一个细节,修改之后会默认加入EPOLLERR | EPOLLHUP这两个事件监听,一个表示套接字发生错误,一个表示套接字被挂断。 if (epi) { epi->event.events = event->events; epi->event.events |= EPOLLERR | EPOLLHUP; } else { errno = -ENOENT; return -1; } } else { nty_trace_epoll("op is no exist\n"); assert(0); } return 0; } 每根据一个不同的选择就创建一个结构,并查找它是不是在红黑树中可能有点冗余,感觉直接在上面创建一个即可。 这段代码看起来不难,其实细节有很多的,首先先来说一下大体的步骤: 根据传入的epfd找到对应的eventpoll结构。根据不同的选择类型进行if-else判断 1.首先所有操作,都要先在栈区建立一个epitem的结构,然后将sock传入其中。并使用红黑树查找函数传入该结构查找。 2.根据有没有来进行构造删除,或者修改。 以插入为例,没有则在堆区构建结构体对象,然后向红黑树中插入,使用memcpy来加入事件。 其中的两个细节包括: 都是先建立一个栈区对象,然后红黑树中进行查找的,这里可以优化一下。 在修改的时候,会加入监听事件:文件描述符出错,套接字被挂断。 (3)epoll_wait的源码 这个函数就是用于等待事件就绪,然后将他插入就绪队列中的,其中这里的epoll_event是一个输出型参数,它通常表示一个数组的首地址。这里可以再回顾一下它是怎么进行传参的: int n=epoll_wait(epfd,revs,NUM,-1); 其中epfd显然还是去找eventpoll的,revs是一个数组首元素地址(我们建立一个数组,传入数组名其实就可以了),NUM是一个整数,表示多少个套接字就绪了就可以返回了,-1表示的是只要没有文件描述符就绪,就永久阻塞。 值得注意的是,这里我们没有回调函数的实现,也就是说暂时没有套接字就绪了将它插入等待队列中的操作,它的实现在后面讲。 int epoll_wait(int epid, struct epoll_event *events, int maxevents, int timeout) { //tcp相关,可恶我写完这篇文章一定看看这是个啥东西 nty_tcp_manager *tcp = nty_get_tcp_manager(); if (!tcp) return -1; //nty_socket_map *epsocket = &tcp->smap[epid]; //找到eventpoll struct _nty_socket *epsocket = tcp->fdtable->sockfds[epid]; if (epsocket == NULL) return -1; if (epsocket->socktype == NTY_TCP_SOCK_UNUSED) { errno = -EBADF; return -1; } if (epsocket->socktype != NTY_TCP_SOCK_EPOLL) { errno = -EINVAL; return -1; } struct eventpoll *ep = (struct eventpoll*)epsocket->ep; if (!ep || !events || maxevents <= 0) { errno = -EINVAL; return -1; } //线程相关,先不研究 if (pthread_mutex_lock(&ep->cdmtx)) { if (errno == EDEADLK) { nty_trace_epoll("epoll lock blocked\n"); } assert(0); } //如果rdnum为空,并且等待时间不为0的时候会等待一段时间 while (ep->rdnum == 0 && timeout != 0) { ep->waiting = 1; if (timeout > 0) { struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); if (timeout >= 1000) { int sec; sec = timeout / 1000; deadline.tv_sec += sec; timeout -= sec * 1000; } deadline.tv_nsec += timeout * 1000000; if (deadline.tv_nsec >= 1000000000) { deadline.tv_sec++; deadline.tv_nsec -= 1000000000; } int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline); if (ret && ret != ETIMEDOUT) { nty_trace_epoll("pthread_cond_timewait\n"); pthread_mutex_unlock(&ep->cdmtx); return -1; } timeout = 0; } else if (timeout < 0) { int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx); if (ret) { nty_trace_epoll("pthread_cond_wait\n"); pthread_mutex_unlock(&ep->cdmtx); return -1; } } ep->waiting = 0; } pthread_mutex_unlock(&ep->cdmtx); pthread_spin_lock(&ep->lock); int cnt = 0; //哪个少将哪个作为事件的数量 int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum); int i = 0; while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET //每次从双向链表的头节点取得一个一个的节点 struct epitem *epi = LIST_FIRST(&ep->rdlist); //把这个节点从双向链表中删除 LIST_REMOVE(epi, rdlink); //标记这个节点不在双向链表中,但是在红黑树中 epi->rdy = 0;//只有当该节点再次被放入双向链表中的时候,才会置为1 //把标记的信息拷贝出来,拷贝到提供的events参数中 memcpy(&events[i++], &epi->event, sizeof(struct epoll_event)); //--,++的操作 num --; cnt ++; ep->rdnum --; } pthread_spin_unlock(&ep->lock); return cnt; } (4)epoll_event_callback() 1.客户端connect()连入,服务器处于SYN_RCVD状态时 2.三路握手完成,服务器处于ESTABLISHED状态时 3.客户端close()断开连接,服务器处于FIN_WAIT_1和FIN_WAIT_2状态时 4.客户端send/write()数据,服务器可读时 5.服务器可以发送数据时 它的作用是向双向链表中添加一个红黑树的节点。它的参数有一个eventpoll类型,注意这里没有传epid,有一个sockid,有一个event。 int epoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) { struct epitem tmp; tmp.sockfd = sockid; //首先根据sockid找到红黑树中的节点 struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { nty_trace_epoll("rbtree not exist\n"); assert(0); } //根据epi->rdy判断该节点是否在双向链表里 if (epi->rdy) { //如果在就将事件填入events中 epi->event.events |= event; return 1; } //如果不在,要做的就是将这个节点添加到双向链表中的表头位置 nty_trace_epoll("epoll_event_callback --> %d\n", epi->sockfd); pthread_spin_lock(&ep->lock); epi->rdy = 1; LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink); ep->rdnum ++; pthread_spin_unlock(&ep->lock); pthread_mutex_lock(&ep->cdmtx); pthread_cond_signal(&ep->cond); pthread_mutex_unlock(&ep->cdmtx); return 0; } 5.水平触发和边缘触发 1.状态变化 要讲明白水平触发和边缘触发就需要知道都有哪些状态会触发,无非也就这四种: 可读:socket上有数据不可读:socket上没有数据了可写:socket上有空间可写不可写:socket上无空间可写 对于水平触发模式,一个事件只要有,就会一直被触发。对于边缘触发模式,一个事件只要从无到有就会被触发。 2.LT模式 读:只要接收缓冲区上有未读完的数据,就会一直被触发。 写:只要发送缓冲区上还有空间,就会一直被触发。如果程序依赖于可写事件触发去发送数据,要移除可写事件,避免无意义的触发。 在LT模式下,读事件触发后,可以按需收取想要的字节数,不用把本次接收到的数据收取干净。 3.ET模式 读:只有接收缓冲区上的数据从无到有,就会被触发一次。 写:只有发送缓冲区上由不可写到可写,就会触发,(由满到不满) 在ET模式下,当读事件触发后,需要将数据一次性读干净。 6.epoll中的锁 通过阅读上面的代码,我们发现访问红黑树和访问双向链表的时候会加锁。 红黑树:互斥锁,因为红黑树是一个自平衡的二叉搜索树,多线程访问的时候可能改变树的结构,因此加上互斥锁。 双向链表:自旋锁,是一个更轻量级的锁,因为双向链表的结构是不会改变的,通过自旋等待的方式获取锁,避免了切换上下文的开销。 7.epoll的致命缺点 (1)多线程扩展性 场景一:同一个 listen fd 在多个 CPU 上调用 accept 系统调用 水平触发: 1.内核收到了一个链接请求。同时唤醒了A和B两个在epoll_wait上等待的线程。 2.线程A epoll_wait成功,而线程B epoll_wait失败。 3.线程B其实不需要唤醒,造成惊群效应,消耗资源。 边缘触发: 1.内核收到了一个链接请求,由于是边缘触发所以只会唤醒一个线程,假设线程A被唤醒。 2.A正在accept的时候,突然又来了一个链接,此时由于由于监听套接字处于就绪状态,没有产生新事件,所以就不会发起通知。 3.由于不会发起通知,所以必须由A再去处理该链接,这样就造成了线程饥饿的问题。 (2)epoll 所注册的 fd(file descriptor) 和实际内核中控制的结构 file description 拥有不同的生命周期 epoll混淆了上图中的进程的文件描述符和系统中的文件描述符表。当进行EPOLLADD后,epoll其实监听的是系统级的文件描述符,所以当close(fd)的时候,如果对应的description只有它一个descriptor的时候,正常关闭。 但如果它有多个descriptor与之对应的话,就会发生即使将该文件描述符关闭了,但是还是可以接收到事件的情况。更糟糕的是,一旦你 close() 了这个 fd,再也没有机会把这个死掉的 fd 从 epoll 上摘除了。所以在删除之前一定要进行EPOLLDELETE。