協(xié)程的實現(xiàn)之原語操作
問題:協(xié)程的內(nèi)部原語操作有哪些?分別如何實現(xiàn)的?
協(xié)程的核心原語操作:create, resume, yield。協(xié)程的原語操作有create怎么沒有exit?以NtyCo為例,協(xié)程一旦創(chuàng)建就不能有用戶自己銷毀,必須得以子過程執(zhí)行結(jié)束,就會自動銷毀協(xié)程的上下文數(shù)據(jù)。以_exec執(zhí)行入口函數(shù)返回而銷毀協(xié)程的上下文與相關(guān)信息。co->func(co->arg) 是子過程,若用戶需要長久運行協(xié)程,就必須要在func函數(shù)里面寫入循環(huán)等操作。所以NtyCo里面沒有實現(xiàn)exit的原語操作。
create:創(chuàng)建一個協(xié)程。
1. 調(diào)度器是否存在,不存在也創(chuàng)建。調(diào)度器作為全局的單例。將調(diào)度器的實例存儲在線程的私有空間pthread_setspecific。
2. 分配一個coroutine的內(nèi)存空間,分別設(shè)置coroutine的數(shù)據(jù)項,棧空間,棧大小,初始狀態(tài),創(chuàng)建時間,子過程回調(diào)函數(shù),子過程的調(diào)用參數(shù)。
3. 將新分配協(xié)程添加到就緒隊列 ready_queue中
實現(xiàn)代碼如下:
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {
assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) {
nty_schedule_create(0);
sched = nty_coroutine_get_sched();
if (sched == NULL) {
printf("Failed to create schedulern");
return -1;
}
}
nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
if (co == NULL) {
printf("Failed to allocate memory for new coroutinen");
return -2;
}
//
int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
if (ret) {
printf("Failed to allocate stack for new coroutinen");
free(co);
return -3;
}
co->sched = sched;
co->stack_size = sched->stack_size;
co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
co->id = sched->spawned_coroutines ++;
co->func = func;
co->fd = -1;
co->events = 0;
co->arg = arg;
co->birth = nty_coroutine_usec_now();
*new_co = co;
TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);
return 0;
}
yield: 讓出CPU。
void nty_coroutine_yield(nty_coroutine *co)
參數(shù):當前運行的協(xié)程實例
調(diào)用后該函數(shù)不會立即返回,而是切換到最近執(zhí)行resume的上下文。該函數(shù)返回是在執(zhí)行resume的時候,會有調(diào)度器統(tǒng)一選擇resume的,然后再次調(diào)用yield的。resume與yield是兩個可逆過程的原子操作。
resume:恢復(fù)協(xié)程的運行權(quán)
int nty_coroutine_resume(nty_coroutine *co)
參數(shù):需要恢復(fù)運行的協(xié)程實例
調(diào)用后該函數(shù)也不會立即返回,而是切換到運行協(xié)程實例的yield的位置。返回是在等協(xié)程相應(yīng)事務(wù)處理完成后,主動yield會返回到resume的地方。
協(xié)程的實現(xiàn)之切換
問題:協(xié)程的上下文如何切換?切換代碼如何實現(xiàn)?
首先來回顧一下x86_64寄存器的相關(guān)知識。x86_64 的寄存器有16個64位寄存器,分別是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,
%r13, %r14, %r15。
%rax 作為函數(shù)返回值使用的。
%rsp 棧指針寄存器,指向棧頂
%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。。。
%rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲,遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改
%r10, %r11 用作數(shù)據(jù)存儲,就是使用前要先保存原值。
上下文切換,就是將CPU的寄存器暫時保存,再將即將運行的協(xié)程的上下文寄存器,分別mov到相對應(yīng)的寄存器上。此時上下文完成切換。如下圖所示:
切換_switch函數(shù)定義:
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
參數(shù)1:即將運行協(xié)程的上下文,寄存器列表
參數(shù)2:正在運行協(xié)程的上下文,寄存器列表
我們nty_cpu_ctx結(jié)構(gòu)體的定義,為了兼容x86,結(jié)構(gòu)體項命令采用的是x86的寄存器名字命名。
typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
_switch返回后,執(zhí)行即將運行協(xié)程的上下文。是實現(xiàn)上下文的切換
_switch的實現(xiàn)代碼:
0: __asm__ (
1: " .text n"
2: " .p2align 4,,15 n"
3: ".globl _switch n"
4: ".globl __switch n"
5: "_switch: n"
6: "__switch: n"
7: " movq %rsp, 0(%rsi) # save stack_pointer n"
8: " movq %rbp, 8(%rsi) # save frame_pointer n"
9: " movq (%rsp), %rax # save insn_pointer n"
10: " movq %rax, 16(%rsi) n"
11: " movq %rbx, 24(%rsi) # save rbx,r12-r15 n"
12: " movq %r12, 32(%rsi) n"
13: " movq %r13, 40(%rsi) n"
14: " movq %r14, 48(%rsi) n"
15: " movq %r15, 56(%rsi) n"
16: " movq 56(%rdi), %r15 n"
17: " movq 48(%rdi), %r14 n"
18: " movq 40(%rdi), %r13 # restore rbx,r12-r15 n"
19: " movq 32(%rdi), %r12 n"
20: " movq 24(%rdi), %rbx n"
21: " movq 8(%rdi), %rbp # restore frame_pointer n"
22: " movq 0(%rdi), %rsp # restore stack_pointer n"
23: " movq 16(%rdi), %rax # restore insn_pointer n"
24: " movq %rax, (%rsp) n"
25: " ret n"
26: );
按照x86_64的寄存器定義,%rdi保存第一個參數(shù)的值,即new_ctx的值,%rsi保存第二個參數(shù)的值,即保存cur_ctx的值。X86_64每個寄存器是64bit,8byte。
Movq %rsp, 0(%rsi) 保存在棧指針到cur_ctx實例的rsp項
Movq %rbp, 8(%rsi)
Movq (%rsp), %rax #將棧頂?shù)刂防锩娴闹荡鎯Φ絩ax寄存器中。Ret后出棧,執(zhí)行棧頂
Movq %rbp, 8(%rsi) #后續(xù)的指令都是用來保存CPU的寄存器到new_ctx的每一項中
Movq 8(%rdi), %rbp #將new_ctx的值
Movq 16(%rdi), %rax #將指令指針rip的值存儲到rax中
Movq %rax, (%rsp) # 將存儲的rip值的rax寄存器賦值給棧指針的地址的值。
Ret # 出棧,回到棧指針,執(zhí)行rip指向的指令。
上下文環(huán)境的切換完成
協(xié)程的實現(xiàn)之定義
問題:協(xié)程如何定義? 調(diào)度器如何定義?
先來一道設(shè)計題:
設(shè)計一個協(xié)程的運行體R與運行體調(diào)度器S的結(jié)構(gòu)體
1. 運行體R:包含運行狀態(tài){就緒,睡眠,等待},運行體回調(diào)函數(shù),回調(diào)參數(shù),棧指針,棧大小,當前運行體
2. 調(diào)度器S:包含執(zhí)行集合{就緒,睡眠,等待}
這道設(shè)計題拆分兩個個問題,一個運行體如何高效地在多種狀態(tài)集合更換。調(diào)度器與運行體的功能界限。
運行體如何高效地在多種狀態(tài)集合更換
新創(chuàng)建的協(xié)程,創(chuàng)建完成后,加入到就緒集合,等待調(diào)度器的調(diào)度;協(xié)程在運行完成后,進行IO操作,此時IO并未準備好,進入等待狀態(tài)集合;IO準備就緒,協(xié)程開始運行,后續(xù)進行sleep操作,此時進入到睡眠狀態(tài)集合。
就緒(ready),睡眠(sleep),等待(wait)集合該采用如何數(shù)據(jù)結(jié)構(gòu)來存儲?
就緒(ready)集合并不沒有設(shè)置優(yōu)先級的選型,所有在協(xié)程優(yōu)先級一致,所以可以使用隊列來存儲就緒的協(xié)程,簡稱為就緒隊列(ready_queue)。
睡眠(sleep)集合需要按照睡眠時長進行排序,采用紅黑樹來存儲,簡稱睡眠樹(sleep_tree)紅黑樹在工程實用為<key, value>, key為睡眠時長,value為對應(yīng)的協(xié)程結(jié)點。
等待(wait)集合,其功能是在等待IO準備就緒,等待IO也是有時長的,所以等待(wait)集合采用紅黑樹的來存儲,簡稱等待樹(wait_tree),此處借鑒Nginx的設(shè)計。
數(shù)據(jù)結(jié)構(gòu)如下圖所示:
Coroutine就是協(xié)程的相應(yīng)屬性,status表示協(xié)程的運行狀態(tài)。sleep與wait兩顆紅黑樹,ready使用的隊列,比如某協(xié)程調(diào)用sleep函數(shù),加入睡眠樹(sleep_tree),status |= S即可。比如某協(xié)程在等待樹(wait_tree)中,而IO準備就緒放入ready隊列中,只需要移出等待樹(wait_tree),狀態(tài)更改status &= ~W即可。有一個前提條件就是不管何種運行狀態(tài)的協(xié)程,都在就緒隊列中,只是同時包含有其他的運行狀態(tài)。
調(diào)度器與協(xié)程的功能界限
每一協(xié)程都需要使用的而且可能會不同屬性的,就是協(xié)程屬性。每一協(xié)程都需要的而且數(shù)據(jù)一致的,就是調(diào)度器的屬性。比如棧大小的數(shù)值,每個協(xié)程都一樣的后不做更改可以作為調(diào)度器的屬性,如果每個協(xié)程大小不一致,則可以作為協(xié)程的屬性。
用來管理所有協(xié)程的屬性,作為調(diào)度器的屬性。比如epoll用來管理每一個協(xié)程對應(yīng)的IO,是需要作為調(diào)度器屬性。
按照前面幾章的描述,定義一個協(xié)程結(jié)構(gòu)體需要多少域,我們描述了每一個協(xié)程有自己的上下文環(huán)境,需要保存CPU的寄存器ctx;需要有子過程的回調(diào)函數(shù)func;需要有子過程回調(diào)函數(shù)的參數(shù) arg;需要定義自己的??臻g stack;需要有自己??臻g的大小 stack_size;需要定義協(xié)程的創(chuàng)建時間 birth;需要定義協(xié)程當前的運行狀態(tài) status;需要定當前運行狀態(tài)的結(jié)點(ready_next, wait_node, sleep_node);需要定義協(xié)程id;需要定義調(diào)度器的全局對象 sched。
協(xié)程的核心結(jié)構(gòu)體如下:
typedef struct _nty_coroutine {
nty_cpu_ctx ctx;
proc_coroutine func;
void *arg;
size_t stack_size;
nty_coroutine_status status;
nty_schedule *sched;
uint64_t birth;
uint64_t id;
void *stack;
RB_ENTRY(_nty_coroutine) sleep_node;
RB_ENTRY(_nty_coroutine) wait_node;
TAILQ_ENTRY(_nty_coroutine) ready_next;
TAILQ_ENTRY(_nty_coroutine) defer_next;
} nty_coroutine;
調(diào)度器是管理所有協(xié)程運行的組件,協(xié)程與調(diào)度器的運行關(guān)系。
調(diào)度器的屬性,需要在保存CPU的寄存器上下文 ctx,可以從協(xié)程運行狀態(tài)yield到調(diào)度器運行的。從協(xié)程到調(diào)度器用yield,從調(diào)度器到協(xié)程用resume
以下為協(xié)程的定義。
typedef struct _nty_coroutine_queue nty_coroutine_queue;
typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
typedef struct _nty_schedule {
uint64_t birth;
nty_cpu_ctx ctx;
struct _nty_coroutine *curr_thread;
int page_size;
int poller_fd;
int eventfd;
struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
int nevents;
int num_new_events;
nty_coroutine_queue ready;
nty_coroutine_rbtree_sleep sleeping;
nty_coroutine_rbtree_wait waiting;
} nty_schedule;
協(xié)程的實現(xiàn)之調(diào)度器
問題:協(xié)程如何被調(diào)度?
調(diào)度器的實現(xiàn),有兩種方案,一種是生產(chǎn)者消費者模式,另一種多狀態(tài)運行。
生產(chǎn)者消費者模式
邏輯代碼如下:
while (1) {
//遍歷睡眠集合,將滿足條件的加入到ready
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) {
TAILQ_ADD(&sched->ready, expired);
}
//遍歷等待集合,將滿足添加的加入到ready
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
TAILQ_ADD(&sched->ready, wait);
}
// 使用resume回復(fù)ready的協(xié)程運行權(quán)
while (!TAILQ_EMPTY(&sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
多狀態(tài)運行
實現(xiàn)邏輯代碼如下:
while (1) {
//遍歷睡眠集合,使用resume恢復(fù)expired的協(xié)程運行權(quán)
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) {
resume(expired);
}
//遍歷等待集合,使用resume恢復(fù)wait的協(xié)程運行權(quán)
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
resume(wait);
}
// 使用resume恢復(fù)ready的協(xié)程運行權(quán)
while (!TAILQ_EMPTY(sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}