Kqueue和其他的多路復用IO的核心是,單消費者同時監聽不同種類的生產者,從而提供高性能的單線程IO,減少調度開銷。而Kqueue通過在內核態維持狀態提供了更高的性能。
生產者消費者模型
單Producer和單Consumer
生產者/消費者模型是常見的通信模型,通過共享內核緩沖區環形隊列,實現異步的事件通知。雙方只關注緩沖區內的數據,而不關注彼此,因此常常被用于網絡通信。
信號量
為了避免消費者在緩存區未滿時無意義的輪詢,消費者block直到生產者通知。wait時線程設置信號量并且block,notify時內核通知所有等待信號的線程狀態改為RUNNABLE。
事實上就是linux的pthread_cond_wait和phread_cond_signal原語。consumer之所以要帶鎖wait,是因為在內部進行調度yield_wait前要放掉鎖,否則其他線程無法進入臨界區;喚醒之后重新獲得鎖。(這里指的鎖是外部事務的鎖)
wait和notify需要增加鎖,防止notify先于wait進行。(這里的鎖指的是內部事務的鎖)
wait調用的yield_wait在調度時需要臨時釋放并隨后獲取內部事務鎖,否則會阻塞其他的notify造成全員block。
send(bb, msg):
acquire(bb.lock)
while True:
if bb.in - bb.out < N:
bb.buf[bb.in mod N] <- msg
bb.in <- bb.in + 1
release(bb.lock)
notify(bb.not_empty)
return
wait(bb.not_full, bb.block)
receive(bb):
acquire(bb.lock)
while True:
if bb.in > bb.out:
msg <- bb.buf[bb.out mod N]
bb.out <- bb.out + 1
release(bb.lock)
wait(bb.not_full)
return
wait(bb.not_empty, bb.block)
Eventcount & Sequencer
這是1979年提出的算法,作為信號量的可替換實現。Sequencer的目的是處理多producer。
semaphores
send(Buffer& buffer,Message msg) {
t=TICKET(T);
AWAIT(buffer.in, t);
AWAIT(buffer.out, READ(buffer.in)-N);
buffer[READ(buffer.in)%N]=msg;
ADVANCE(in);
}
receive(Buffer& buffer) {
AWAIT(buffer.in, READ(buffer.out));
msg = buffer[READ(buffer.out)%N];
ADVANCE(buffer.out);
return msg;
}
- AWAIT(event*,val) - 比較event.count和val,如果大于則返回,否則存入線程TCB并yield
- ADVANCE(event*) - 自增event.count并將所有同event且event.count>val的線程喚醒
- TICKET(sequencer*) - 原子性自增序號,目的是處理并發的sender
- READ(event*) - 原子性讀event.count,因為可能讀操作涉及多memory cell
send等待in超過ticket,相當于拿排隊鎖輪到自己。然后等待緩存區未滿時寫入數據。
receive等待緩沖區存在數據時讀取數據。
Kqueue
https://people.freebsd.org/~jlemon/papers/kqueue.pdf
問題在于,上面提到的做法本質上都是監聽著一個事件,如果我們想要處理多個監聽事件,操作系統必須提供新的原語,例如每個socket都對應著一個file descriptor,需要同時監聽所有socket的事件。BSD的Kqueue和Linux的epoll都是解決這種問題的方式,本質上它們就是IPC,但是單純從IO的角度看叫做多路復用IO。目前epoll用于netty的底層,是單線程實現高性能網絡的關鍵。
傳統的select和poll僅僅適用于file descriptor,但是無法關注其他IPC機制,例如信號、文件系統變化、異步IO完成、進程存在;并且也不具備scalability。
第一個問題在于參數傳遞,每次都必須傳遞整個事件組,并且動態在內核中創建和銷毀內存。第二個問題在于內核必須遍歷整個fd列表去找活躍的fd。初始遍歷一次確定沒有active的fd才能沉睡,如果沒有active還要再遍歷一次設定回調來喚醒,最后喚醒時還要再遍歷一次來看是哪個fd喚醒了。
問題出在這個syscall無狀態上,無法利用之前的信息,每次都得重新計算。因此Kqueue的機制就在于內核中維持一個隊列儲存狀態。
int
kqueue(void);
int
kevent(int kq,const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
struct kevent{
uintpt t ident; // 事件關注對象的ID,kq,ident,filter確定唯一的event
// 事件類型,ident,fflags,data應該如何被解釋?
u short flags; // 輸入: 增加/減少,使能/禁止, 執行后重置/刪除;輸出: 發生EOF或者ERROR
u int fflags; // 活躍時應該怎么做,是否返回event?
intptr t data; // filter和fflags規定的數據傳輸方式
void *udata; // 自定義的數據傳輸方式
__uint64_t ext[4]; //在末尾增加的額外信息Hint
}
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
kevent()用于創建kqueue并且返回對應的capability(權限控制的抽象)。
kevent()用于注冊event,并設定超時,changelist是指kqueue注冊的event如何變化,eventlist則是返回的event。當event觸發時,會調用內核的回調函數,通知進程。
filter
- EVFILT READ :poll近似的實現,當socket_buffer大于SO_LOWAT時觸發將size寫入data或者斷連時觸發EOF,幫助應用處理數據。
- EVFILT WRITE: 類似READ
- EVFILT AIO: aio_read/write請求后通過事件進行aio_error輪詢,事件返回后aio_return
- EVFILT SIGNAL: id為信號值,返回data為信號計數,通知后clear
- EVFILT VNODE: 監聽文件系統vnode,id為fd, fflags監聽下列事件并返回所有發生事件
NOTE DELETE
NOTE WRITE
NOTE EXTEND
NOTE ATTRIB
NOTE LINK
NOTE RENAME
- EVFILT PROC:監聽進程狀態,id為PID,fflags監聽下列事件
NOTE EXIT/FORK/EXEC 監聽exit,fork,execve等原語
NOTE TRACK 若父進程設定為Track則fork后子進程為CHILD
輸出:
NOTE CHILD 子進程fork后設定child,并且父進程id存入data
NOTE TRACKERR 無法添加子進程事件,通常因為資源限制
sample
handle_events()
{
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
n = kevent(kq, ch, nchanges,
evi, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
if (evi.filter == EVFILT_READ)
readable_fd(evi.ident);
if (evi.filter == EVFILT_WRITE)
writeable_fd(evi.ident);
}
...
}
update_fd(int fd, int action,int filter)
{
EV_SET(&chnchanges, fd, filter,action == ADD ?
EV_ADD : EV_DELETE,
0, 0, 0);
nchanges++;
}
Kqueue實現
Knote
- 計算當前節點的活躍度
- 鏈接其他knote
- 存儲自己所在的Kqueue的指針
struct knote {
SLIST_ENTRY(knote) kn_link; /* for kq */
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
TAILQ_ENTRY(knote) kn_tqe;
struct kqueue *kn_kq; /* which queue we are on */
struct kevent kn_kevent;
void *kn_hook;
int kn_hookid;
int kn_status; /* protected by kq lock */
#define KN_ACTIVE 0x01 /* event has been triggered */
#define KN_QUEUED 0x02 /* event is on queue */
#define KN_DISABLED 0x04 /* event is disabled */
#define KN_DETACHED 0x08 /* knote is detached */
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
int kn_influx;
int kn_sfflags; /* saved filter flags */
int64_t kn_sdata; /* saved data field */
union {
struct file *p_fp; /* file data pointer */
struct proc *p_proc; /* proc pointer */
struct kaiocb *p_aio; /* AIO job pointer */
struct aioliojob *p_lio; /* LIO job pointer */
void *p_v; /* generic other pointer */
} kn_ptr;
struct filterops *kn_fop;
#define kn_id kn_kevent.ident
#define kn_filter kn_kevent.filter
#define kn_flags kn_kevent.flags
#define kn_fflags kn_kevent.fflags
#define kn_data kn_kevent.data
#define kn_fp kn_ptr.p_fp
};
Kqueue
- kp_knlist存所有knode用于GC
- kp_head存存儲所有標記為active的knode
- kq_knhash存儲iden->descriptor的映射
- kq_fdp fd索引的數組(同open file table)用于關閉fd時刪除對應的knode
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
TAILQ_ENTRY(kqueue) kq_list;
TAILQ_HEAD(, knote) kq_head; /* list of pending event */
int kq_count; /* number of pending events */
struct selinfo kq_sel;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_state;
#define KQ_SEL 0x01
#define KQ_SLEEP 0x02
#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */
#define KQ_ASYNC 0x08
#define KQ_CLOSING 0x10
#define KQ_TASKSCHED 0x20 /* task scheduled */
#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */
int kq_knlistsize; /* size of knlist */
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
struct task kq_task;
struct ucred *kq_cred;
};
Registration
kqueue
kqueue本身作為文件抽象看待,在OFT里注冊entry創建內核對象并賦予descriptor索引。hash和內部的array并不分配。
kevent
int
kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents, const struct timespec *timeout)
{
return (((int (*)(int, const struct kevent *, int,
struct kevent *, int, const struct timespec *))
__libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges,
eventlist, nevents, timeout));
}
這里調用了kqueue_register來對changeList進行注冊。首先根據線程和fd獲取文件的FCB,kq對于fp引用計數++,然后調用實際的注冊函數。注冊的代碼太長了,大體就是先根據<Iden,filter>尋找knote節點,找不到如果是EV_ADD則增加knote,否則把事件增加到knote上去。
int
kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
{
struct kqueue *kq;
struct file *fp;
cap_rights_t rights;
int error;
error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
if (error != 0)
return (error);
if ((error = kqueue_acquire(fp, &kq)) != 0)
goto noacquire;
error = kqueue_register(kq, kev, td, mflag);
kqueue_release(kq, 0);
noacquire:
fdrop(fp, td);
return (error);
}
Filter
filter的作用就是對于事件源進行過濾,事件源所有的活動都會調用filter,但是只有符合filter規則的事件才會報告給應用,也就是返回布爾值,同時他也會修改fflags和data產生副作用(上面提到的輸出參數)。filter封裝了事件,kqueue只能詢問他是否活躍,而對事件的細節一無所知。因此只需要增加filter,就能拓展事件的內容。
Activity
在所有觸發這些活動的地方插入hook函數,調用knote()函數遍歷自己維護的klist(注冊的時候維護的),調用filter。
如果事件觸發則激活,通過knote找到其所屬的kqueue,并且將knote加入kqueue的active鏈末尾。如果已經在了,那么不用增加knote,但是filter還是會記錄activity(e.g.上文提到的副作用)。
這里有些special case,例如fork需要看是不是TRACK,來判斷是否報告子節點的PID
Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.
首先,激活父進程的knote,然后創建新的knote分配給子節點,并且設置CHILD flag和對應的父進程PID。同時這里還提到了可能存在事件可能改變data,因此為EXIT額外分配一個節點。
/*
* Activate existing knote and register tracking knotes with
* new process.
*
* First register a knote to get just the child notice. This
* must be a separate note from a potential NOTE_EXIT
* notification since both NOTE_CHILD and NOTE_EXIT are defined
* to use the data field (in conflicting ways).
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT |
EV_FLAG2;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
/*
* Then register another knote to track other potential events
* from the new process.
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
if (kn->kn_fop->f_event(kn, NOTE_FORK))
KNOTE_ACTIVATE(kn, 0);
list->kl_lock(list->kl_lockarg);
KQ_LOCK(kq);
kn_leave_flux(kn);
KQ_UNLOCK_FLUX(kq);
Delivery
kqueue_scan在active鏈末尾加入哨兵,如果scan時扔出了哨兵,那么遍歷結束。
每次都從active移除一個節點(注意檢查timeout,過期也要移除,DISABLE也是在這里移除),如果不是ONESHOP,那么filter帶著query hint重新檢查一遍是否激活,防止途中又被修改。
The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the Application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.
確認激活的knote的信息將會拷貝到kevnet通過eventlist返回給應用進行通知。如果ONESHOP則直接從kqueue中移除,否則如果filter看它仍然active,就把它重新放到active鏈末尾(上次掃描的哨兵之后)。直到哨兵被出列,scan完成。
Miscellaneous Notes
1.論文的版本fork的時候不復制kqueue的df除非vfork。如果復制的話需要在fork時進行整個kqueue復制或者標記為COW。(現在不知道是不是這么做的)
2.kqueue是通過維護klist來對整條鏈涉及的所有進程進行通知的,而不是像poll或者select那樣在sellInfo持有pid。下面這段話看不懂了,沒看過poll不知道啥叫collision。
While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up
3.考慮同一個klist有不同類型的filter,調用knode時應該給予額外信息通知他到底是什么事件觸發的(例如PROC和SIGNAL容易混淆),因此利用hint確定activity和哪個相關
4. kevent要經歷兩次拷貝,增加了overhead。因此如果采用AIO更好,kernel直接修改user狀態下的control block。那么為什么不這么做呢?根本原因在于如果內核不允許直接寫用戶態數據的話,bug會更好定位,同時應用也不需要考慮狀態。
總結
精妙之處在于kqueue維持在內核中,因此socket如果滿了可以直接將knote加入進程kqueue的活躍鏈,而不需要等到下次syscall的時候再檢查。例如,即使我長期不kevent,knote()依然會將他們的activity存儲在knote上并且插入active list,下次只需要遍歷active list而不需要重頭遍歷整個queue。
同時因為kqueue有狀態,進行修改也開銷很小,只需要改變變化的那部分就行了。
看的時候還是有些地方比較難理解,加上源代碼也很復雜,如果有糾錯請指正。
附錄
filechange
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, fd, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_RENAME | NOTE_WRITE |
NOTE_DELETE | NOTE_ATTRIB, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0) {
printf("The file was");
if (ev.fflags & NOTE_RENAME)
printf(" renamed");
if (ev.fflags & NOTE_WRITE)
printf(" written");
if (ev.fflags & NOTE_DELETE)
printf(" deleted");
if (ev.fflags & NOTE_ATTRIB)
printf(" chmod/chowned");
printf("n");
}
signal
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, SIGHUP, EVFILT_SIGNAL,
EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
signal(SIGHUP, SIG_IGN);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0)
printf("signal %d delivered"
" %d timesn",
ev.ident, ev.data);
}
udata
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
void (* fcn)(struct kevent *);
n = kevent(kq, ch, nchanges,
ev, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
fcn = evi.udata;
fcn(&evi);
}