epoll是linux引以為榮的技術,因為相對于select和poll有很大的性能改進。本文主要介紹epoll的實現原理,了解epoll高效背后的魔法。
epoll的使用簡介
1. epoll_create
使用epoll時需要使用epoll_create()創建一個epoll的文件句柄,epoll_create()函數的原型如下:
intepoll_create(int size);
此接口用于創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。
2. epoll_ctl
使用epoll_ctl()可以向epoll句柄添加或者刪除要監聽的文件句柄。epoll_ctl()函數原型如下:
intepoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
要監聽文件是否可讀寫時先要向epoll句柄注冊要監聽的事件類型。第一個參數是epoll_create()返回的epoll句柄,第二個參數表示動作,用三個宏來表示:
- EPOLL_CTL_ADD:注冊新的fd到epfd中。
- EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件。
- EPOLL_CTL_DEL:從epfd中刪除一個fd。
第三個參數是需要監聽的文件句柄,第四個參數是告訴內核需要監聽什么事。
3. epoll_wait
萬事俱備,現在只需要調用epoll_wait()函數就可以開始等待事件發生了。epoll_wailt()函數的原型如下:
intepoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個 maxevents的值不能大于創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。
epoll實現原理
前面介紹了epoll的使用,接下來主要介紹epoll在內核的實現原理。
當我們在用戶態調用epoll_create()時,會觸發調用內核的sys_epoll_create()。我們先來看看sys_epoll_create()這個函數:
SYSCALL_DEFINE1(epoll_create1, int, flags) { int error, fd; structeventpoll *ep = NULL; structfile *file; ... error = ep_alloc(&ep); ... fd = get_unused_fd_flags(O_RDWR| (flags & O_CLOEXEC)); ... file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags &O_CLOEXEC)); ... fd_install(fd,file); ep->file = file; return fd; }
這個函數主要調用ep_alloc()申請一個eventpoll結構,eventpoll結構是epoll的核心數據結構,我們來看看這個結構的定義:
structeventpoll { spinlock_t lock; structmutexmtx; wait_queue_head_t wq; wait_queue_head_t poll_wait; structlist_headrdllist; structrb_rootrbr; structepitem *ovflist; structuser_struct *user; structfile *file; intvisited; structlist_headvisited_list_link; };
eventpoll結構有三個字段是比較重要的,分別是:wq、rdllist和rbr。
- wq用于保存有哪些進程在等待這個epoll返回。
- rdllist用于保存可讀寫的文件。
- rbr用于建立一個快速查找文件句柄的紅黑樹。
創建完eventpoll結構后,sys_epoll_create()會調用get_unused_fd_flags()獲取一個空閑的文件句柄fd,接著調anon_inode_getfile()獲取一個空閑的file結構,并且把eventpoll結構與file結構綁定。最后調用fd_install()把文件句柄fd與file結構綁定,返回文件句柄fd。通過一系列的操作后,內核就可以通過文件句柄fd與eventpoll結構進行關聯。
根據epoll的使用流程,使用epoll_create()創建epoll句柄后,可以通過epoll_ctl()函數向epoll句柄添加和刪除要監視的文件句柄。調用epoll_ctl()會觸發內核調用sys_epoll_ctl()函數,我們來看看sys_epoll_ctl()函數的最重要部分:
SYSCALL_DEFINE4(epoll_ctl,int, epfd, int, op, int, fd, struct epoll_event __user *,event) { ... switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds,tfile, fd); } else error = -EEXIST; clear_tfile_check_list(); break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi,&epds); } else error = -ENOENT; break; } ... return error; }
sys_epoll_ctl()會根據我們傳遞的op參數來進行不同的操作,我們主要看看op為EPOLL_CTL_ADD的操作,也就是添加操作。當進行添加操作時,sys_epoll_ctl()最終會調用ep_insert()把文件句柄fd添加到eventpoll結構維護的紅黑樹中,ep_insert()代碼如下:
static int ep_insert(struct eventpoll *ep, struct epoll_event*event, struct file *tfile, int fd) { int error, revents, pwake =0; unsignedlong flags; structepitem *epi; structep_pqueue epq; ... if (!(epi =kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM; ... init_poll_funcptr(&epq.pt,ep_ptable_queue_proc); revents = tfile->f_op->poll(tfile,&epq.pt); ... ep_rbtree_insert(ep, epi); ... return 0; }
ep_insert()函數首先創建一個epitem結構用于管理事件,然后調用文件句柄的poll()接口,根據init_poll_funcptr(&epq.pt,ep_ptable_queue_proc)這行代碼的設置,poll()接口最終會調用ep_ptable_queue_proc()函數。ep_ptable_queue_proc()函數代碼如下:
static void ep_ptable_queue_proc(struct file *file,wait_queue_head_t *whead, poll_table*pt) { structepitem *epi = ep_item_from_epqueue(pt); structeppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { epi->nwait = -1; } }
ep_ptable_queue_proc()函數的作用就是把epitem結構添加到文件的等待隊列中,根據上面的代碼可知,當等待隊列被喚醒的時候,將會調用ep_poll_callback()函數。而ep_poll_callback()函數的作用就是把epitem結構放置到eventpoll結構的rdllist隊列中。我們前面分析過,rdllist就是就緒的文件隊列。ep_poll_callback()函數最終會調用wake_up_locked(&ep->wq)喚醒進程。簡化后的代碼如下:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) { ... // 把epitem添加到rdllist隊列中 if(!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink,&ep->rdllist); // 喚醒進程 if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); ... return 1; }
ep_insert()函數最后一個操作就是調用ep_rbtree_insert()把epitem結構添加的eventpoll結構的紅黑樹中。如下圖:
使用紅黑樹管理epitem結構的目的是可以根據文件句柄fd快速查找到對應的epitem結構。紅黑樹是一棵平衡二叉樹,時間復雜度為O(logN)。
添加文件句柄到epoll之后,就可以調用epoll_wait()函數開始監聽文件。epoll_wait()會調用內核的sys_epoll_wait()函數,而sys_epoll_wait()最終會調用ep_poll()函數,代碼如下:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user*events, int maxevents, long timeout) { ... if (list_empty(&ep->rdllist)) { init_waitqueue_entry(&wait,current); wait.flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq,&wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist)|| !jtimeout) break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); spin_lock_irqsave(&ep->lock,flags); } __remove_wait_queue(&ep->wq,&wait); set_current_state(TASK_RUNNING); } ... if (!res && eavail && !(res = ep_send_events(ep, events, maxevents))&& jtimeout) goto retry; return res; }
ep_poll()函數所做的事情很簡單,就是把當前進程設置為可中斷睡眠狀態,然后添加eventpoll結構的等待隊列中,最后調用schedule_timeout()讓出CPU。這樣當前進程就會進入睡眠狀態,當進程醒來的時候會判斷eventpoll結構的rdllist隊列是否為空,然后不為空就調用ep_send_events()函數把可讀寫的文件拷貝到用戶態的events數組中。
那么什么時候當前進程會被喚醒呢?在分析ep_insert()函數的時候,我們提及過當文件狀態發生改變時會調用ep_poll_callback()函數,而ep_poll_callback()函數會把就緒的文件添加到rdllist隊列,并且就會把當前進程喚醒。
總結
本文主要分析了epoll的實現原理,可以知道,epoll并不會對所有文件進行掃描(而select和poll會對所以文件進行掃描),而是使用事件的方式把就緒的文件收集起來,所以epoll的效率非常高。
當然本文還有一些epoll的細節并沒有介紹到,例如水平觸發和邊緣觸發等,有興趣可以自己研究代碼。