前言
- 同步I/O模型通常用于實現Reactor模式
- 異步I/O模型則用于實現Proactor模式
- 最后我們會使用同步I/O方式模擬出Proactor模式
一、Reactor模式
- Reactor 釋義“反應堆”,是一種事件驅動機制
- Reactor的回調函數:和普通函數調用的不同之處在于,應用程序不是主動的調用某個 API 完成處理,而是恰恰 相反,Reactor 逆置了事件處理流程,應用程序需要提供相應的接口并注冊到 Reactor 上, 如果相應的時間發生,Reactor 將主動調用應用程序注冊的接口,這些接口又稱為“回調函數”

- Reactor 模式是處理并發I/O比較常見的一種模式,用于同步 I/O,中心思想是將所有要處理的I/O 事件注冊到一個中心I/O多路復用器上,同時主線程/進程阻塞在多路復用器上; 一旦有 I/O 事件到來或是準備就緒(文件描述符或 socket 可讀、寫),多路復用器返回并將事先注冊的相應 I/O 事件分發到對應的處理器中。
- Reactor 模型有三個重要的組件:多路復用器:由操作系統提供,在 linux 上一般是 select, poll, epoll 等系統調用。事件分發器:將多路復用器中返回的就緒事件分到對應的處理函數中事件處理器:負責處理特定事件的處理函數
- 具體流程如下:注冊讀就緒事件和相應的事件處理器事件分離器等待事件事件到來,激活分離器,分離器調用事件對應的處理器事件處理器完成實際的讀操作,處理讀到的數據,注冊新的事件,然后返還控制 權


需要C/C++ Linux服務器架構師學習資料后臺私信“1”免費獲取(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享
多線程Reactor模式多線程Reactor模式特點:它要求主線程(I/O處理單元)只負責監聽文件描述符上是否有事件發生,有的話就立即將時間通知工作線程(邏輯單元)。除此之外,主線程不做任何其他實質性的工作讀寫數據,接受新的連接,以及處理客戶請求均在工作線程中完成工作流程:①主線程往epoll內核事件表中注冊socket上有數據可讀②主線程調用epoll_wait等待socket上有數據可讀③當socket上有數據可讀時,epoll_wait通知主線程。主線程則將socket可讀事件放入請求隊列④睡眠在請求請求隊列上的某個工作線程被喚醒,它從socket讀取數據,并處理客戶請求,然后往epoll內核事件表中注冊該socket上的寫就緒時間⑤主線程調用epoll_wait等到socket可寫⑥當socket可寫時,epoll_wait通知主線程。主線程將socket可寫事件放入請求隊列⑦睡眠在請求隊列上的某個工作線程被喚醒,它向socket上寫入服務器處理客戶請求的結果
單線程Reactor模式單線程Reactor模式與多線程Reactor模式原理相同。但是工作都是在同一個線程中完成的單線程優缺點:優點:Reactor模型開發效率上比起直接使用IO復用要高,它通常是單線程的,設計目標是希望單線程使用一顆 CPU 的全部資源。優點為每個事件處理中很多時候可以 不考慮共享資源的互斥訪問缺點:可是缺點也是明顯的,現在的硬件發展,已經不再遵循摩爾定 律,CPU 的頻率受制于材料的限制不再有大的提升,而改為是從核數的增加上提升能力單線程Reactor使用多核:如果程序業務很簡單,例如只是簡單的訪問一些提供了并發訪問的服務,就可以直接開啟多個反應堆(Reactor),每個反應堆對應一顆CPU核心這些反應堆上跑的請求互不相關,這是完全可以利用多核的。例如Nginx這樣的http靜態服務器下面是單線程Reactor模式的實現代碼,下載下來之后可以直接編譯運行:
// reactor.c
// 源碼鏈接: https://github.com/dongyusheng/csdn-code/blob/master/server-client/reactor.c
// gcc -o reactor reactor.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <errno.h>
#include <time.h>
#include <libgen.h>
#include <fcntl.h>
#define MAX_EPOLL_EVENTS 1024
#define MAX_BUFFER_SIZE 4096
typedef int NCALLBACK(int, int, void*);
// 事件結構體, 每個套接字都會被封裝為一個事件
struct ntyevent {
int fd; // 事件對應的fd
int events; // 事件類型( 本代碼中我們只處理EPOLL_IN和EPOLL_OUT)
void *arg; // 事件回調函數的參數3, 實際傳入的是一個struct ntyreactor結構體指針
int (*callback)(int fd, int events, void *arg); //事件回調函數
int status; // 當前事件是否位于epoll集合中: 1表示在, 0表示不在
char buffer[MAX_BUFFER_SIZE]; // 讀寫緩沖區
int length; //緩沖區數據的長度
long last_active; // 最后一次活躍的時間
};
// Reactor主體
struct ntyreactor {
int epoll_fd; // epoll套接字
struct ntyevent *events; // reactor當前處理的事件集
};
// 創建一個Tcp Server
int init_server(char *ip, short port);
// 向reactor中添加一個服務器監聽事件
int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback);
/***下面這3個函數是用來對reactor操作的***/
// 初始化reactor
struct ntyreactor *ntyreactor_init();
// 銷毀reactor
int ntyreactor_destroy(struct ntyreactor *reactor);
// reactor運行函數
int ntyreactor_run(struct ntyreactor *reactor);
/***下面這3個函數是用來對ntyevent事件結構操作的***/
// 將一個fd封裝到事件結構中
int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);
// 將一個事件添加/更新到epoll的事件表中
int nty_event_add(int epoll_fd, struct ntyevent* ev);
// 將一個事件移出epoll事件表
int nty_event_del(int epoll_fd, struct ntyevent* event);
/***下面這3個函數是ntyevent事件可以使用的回調函數***/
int accept_callback(int fd, int events, void *arg);
int recv_callback(int fd, int events, void *arg);
int send_callback(int fd, int events, void *arg);
int main(int argc, char *argv[])
{
if(argc != 3)
{
printf("usage: ./%s [ip] [port]n", basename(argv[0]));
exit(EXIT_FAILURE);
}
char *ip = argv[1];
short port = atoi(argv[2]);
int sock_fd;
// 1.初始化一個Tcp Server
sock_fd = init_server(ip, port);
// 2.初始化reactor
struct ntyreactor *reactor = ntyreactor_init();
if( reactor == NULL)
{
printf("Error in %s(), ntyreactor_init: create reactor errorn", __func__);
exit(EXIT_FAILURE);
}
// 3.將Tcp Server添加到reactor事件集中
ntyreactor_addlistener(reactor, sock_fd, accept_callback);
// 4.運行reactor
ntyreactor_run(reactor);
// 5.銷毀
ntyreactor_destroy(reactor);
close(sock_fd);
return 0;
}
int init_server(char *ip, short port)
{
// 1.創建套接字
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(sock_fd == -1)
{
printf("Error in %s(), socket: %sn", __func__, strerror(errno));
return -1;
}
// 2.初始化服務器地址
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1)
{
printf("Error in %s(), inet_pton: %sn", __func__, strerror(errno));
return -1;
}
server_addr.sin_port = htons(port);
// 3.綁定服務器地址
if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1)
{
printf("Error in %s(), bind: %sn", __func__, strerror(errno));
return -1;
}
// 3.監聽
if(listen(sock_fd, 20) == -1)
{
printf("Error in %s(), listen: %sn", __func__, strerror(errno));
return -1;
}
printf("Listen start [%s:%d]...n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));
return sock_fd;
}
struct ntyreactor *ntyreactor_init()
{
// 1.創建一個reactor
struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
if(reactor == NULL)
return NULL;
memset(reactor, 0, sizeof(struct ntyreactor));
// 2.創建reacotr的epoll_fd
reactor->epoll_fd = epoll_create(1);
if(reactor->epoll_fd == -1)
{
printf("Error in %s(), epoll_create: %sn", __func__, strerror(errno));
free(reactor);
return NULL;
}
// 3.創建reactor的事件集
reactor->events = (struct ntyevent*)malloc(sizeof(struct ntyevent) * MAX_EPOLL_EVENTS);
if(reactor->events == NULL)
{
printf("Error in %s(), malloc: %sn", __func__, strerror(errno));
close(reactor->epoll_fd);
free(reactor);
return NULL;
}
return reactor;
}
int ntyreactor_destroy(struct ntyreactor *reactor)
{
if(reactor == NULL)
{
printf("Error in %s(): %sn", __func__, "reactor arg is NULL");
return -1;
}
// 關閉epoll_fd、銷毀事件集、釋放結構
close(reactor->epoll_fd);
free(reactor->events);
free(reactor);
return 0;
}
int ntyreactor_run(struct ntyreactor *reactor)
{
// 1.判斷參數
if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL)
{
printf("Error in %s(): %sn", __func__, "reactor arg is error");
return -1;
}
struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];
// 2.進行epoll_wait()
int nready;
while(1)
{
// 超時檢測
/*
int checkpos = 0, i;
long now = time(NULL);
for (i = 0; i < MAX_EPOLL_EVENTS; i++, checkpos ++) {
if (checkpos == MAX_EPOLL_EVENTS) {
checkpos = 0;
}
// 如果當前索引處的事件status為0, 則不檢測, 進行下一個
if (reactor->events[checkpos].status != 1) {
continue;
}
// 如果超過60秒, 那么就認定為超時, 超時后關閉移除
long duration = now - reactor->events[checkpos].last_active;
if (duration >= 60) {
close(reactor->events[checkpos].fd);
printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);
nty_event_del(reactor->epfd, &reactor->events[checkpos]);
}
}*/
nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000);
// 3.函數出錯
if(nready == -1)
{
// 如果函數在阻塞過程中接收到信號, 那么繼續進行epoll_wait()
if(errno == EAGAIN || errno == EWOULDBLOCK)
continue;
printf("Error in %s(), epoll_wait: %sn", __func__, strerror(errno));
return -1;
}
// 4.函數超時
else if(nready == 0)
continue;
// 5.有事件準備好
else
{
// 遍歷處理已就緒的事件
int i;
for(i = 0; i < nready; ++i)
{
// 獲取事件結構體, 保存在struct epoll_event結構的data.ptr中
struct ntyevent* ev = (struct ntyevent*)ep_events[i].data.ptr;
// 如果事件可讀
if((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
ev->callback(ev->fd, ev->events, ev->arg);
// 如果事件可寫
if((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
ev->callback(ev->fd, ev->events, ev->arg);
}
}
}
return 0;
}
int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback)
{
if(reactor == NULL || fd <0 || callback == NULL)
{
printf("Error in %s(): %sn", __func__, "arg error");
return -1;
}
// 初始化ntyevent事件結構, 然后添加到reactor的epoll事件表中即可
nty_event_set(&reactor->events[fd], fd, EPOLLIN, 0, 0, callback, reactor);
nty_event_add(reactor->epoll_fd, &reactor->events[fd]);
return 0;
}
int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg)
{
if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0)
{
printf("Error in %s(): %sn", __func__, "arg error");
return -1;
}
// 初始化ntyevent結構的相關內容即可
ev->fd = fd;
ev->events = event;
ev->arg = arg;
ev->callback = callback;
ev->status = status;
ev->length = length;
ev->last_active = time(NULL);
return 0;
}
int nty_event_add(int epoll_fd, struct ntyevent* ev)
{
if(epoll_fd <0 || ev == NULL)
{
printf("Error in %s(): %sn", __func__, "arg error");
return -1;
}
// 1.創建一個epoll事件結構
struct epoll_event ep_event;
memset(&ep_event, 0, sizeof(ep_event));
ep_event.events = ev->events;
ep_event.data.ptr = ev;
//ep_event.data.fd = ev->fd; data成員是一個聯合體, 不能同時使用fd和ptr成員
// 2.如果當前ev已經在epoll事件表中, 那么就修改; 否則就把ev加入到epoll事件表中
int op;
if(ev->status == 0)
{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
else
op = EPOLL_CTL_MOD;
// 3.添加/更新
if(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) == -1)
{
printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno));
return -1;
}
return 0;
}
int nty_event_del(int epoll_fd, struct ntyevent* ev)
{
if(epoll_fd < 0 || ev == NULL || ev->status != 1)
{
printf("Error in %s(): %sn", __func__, "ev arg is error");
return -1;
}
// 初始要刪除的epoll事件結構
struct epoll_event ep_event;
memset(&ep_event, 0, sizeof(ep_event));
ep_event.data.ptr = ev;
//ep_event.data.fd = ev->fd; data成員是一個枚舉, 不能同時使用ptr和fd成員
ev->status = 0;
// 從epoll事件表中刪除epoll事件
if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) == -1)
{
printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno));
return -1;
}
return 0;
}
int accept_callback(int fd, int events, void *arg)
{
// 1.獲得reactor結構
struct ntyreactor *reactor = (struct ntyreactor*)arg;
// 2.獲取該fd對應的事件結構
struct ntyevent *ev = reactor->events + fd;
// 3.初始化客戶端地址結構
struct sockaddr_in cli_addr;
memset(&cli_addr, 0 , sizeof(cli_addr));
socklen_t len = sizeof(cli_addr);
// 4.接收客戶端
int cli_fd;
cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len);
if(cli_fd == -1)
{
printf("Error in %s(), accept: %sn", __func__, strerror(errno));
return -1;
}
int i;
do {
// 5.在reactor事件表中找到第一個空位置, 用i表示新事件存放的位置, 也是其套接字的值
// reactor->events的0、1、2、3、4都被占用了, 客戶端第一個可以使用的套接字為5, 因此此處從5開始遍歷
for(i = 5; i< MAX_EPOLL_EVENTS; ++i)
{
if(reactor->events[i].status == 0)
break;
}
// 6.如果滿了, 就退出
if(i == MAX_EPOLL_EVENTS)
{
printf("Error in %s(): max connect limit[%d]n", __func__, MAX_EPOLL_EVENTS);
return -1;
}
// 7.將套接字設置為非阻塞
int flag = 0;
if ((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK)) < 0) {
printf("Error in %s(), fcntl: %sn", __func__, strerror(errno));
return -1;
}
// 8.將新事件添加到reactor事件表中
// 此處我們將新客戶端的回調函數首先設置為recv_callback, 事件類型為EPOLLIN, 因為一般都是客戶端向服務器發送數據的
nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);
nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]);
} while(0);
printf("New connect: [%s:%d], [time:%ld], pos[%d]n",
inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);
return 0;
}
int recv_callback(int fd, int events, void *arg)
{
// 1.獲得reactor結構
struct ntyreactor *reactor =(struct ntyreactor*)arg;
// 2.獲取該fd對應的事件結構
struct ntyevent *ev = reactor->events + fd;
// 3.先將事件從epoll事件集移除
nty_event_del(reactor->epoll_fd, ev);
// 3.接收數據
int rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE, 0);
if(rc < 0) //recv出錯
{
//if(errno == EAGAIN || errno == EWOULDBLOCK)
// return rc;
printf("Error in %s(), recv: %sn", __func__, strerror(errno));
// 此處我們不再需要將該nty_event從epoll事件集移除, 因為上面我們已經移除了
close(ev->fd);
}
else if(rc == 0) //對方關閉了
{
printf("Client closed the connection, fd = %dn", ev->fd);
// 此處我們也當做錯誤處理
// 此處我們不再需要將該nty_event從epoll事件集移除, 因為上面我們已經移除了
close(ev->fd);
}
else //接收到數據
{
ev->buffer[rc] = '';
printf("Recv[fd = %d]: %sn", ev->fd, ev->buffer);
// 將事件變為可讀, 然后加入到epoll事件表中
nty_event_set(ev, ev->fd, EPOLLOUT, rc, 0, send_callback, reactor);
nty_event_add(reactor->epoll_fd, ev);
}
return rc;
}
int send_callback(int fd, int events, void *arg)
{
// 1.獲得reactor結構
struct ntyreactor *reactor =(struct ntyreactor*)arg;
// 2.獲取該fd對應的事件結構
struct ntyevent *ev = reactor->events + fd;
// 3.此處我們把接收的內容再回送給對象, 因此使用的是ev->buffer
int rc = send(ev->fd, ev->buffer, ev->length, 0);
if(rc > 0) //send成功
{
printf("Send[fd = %d]: %sn", ev->fd, ev->buffer);
// 移除、添加: 將其變為可讀
nty_event_del(reactor->epoll_fd, ev);
nty_event_set(ev, ev->fd, EPOLLIN, 0, 0, recv_callback, reactor);
nty_event_add(reactor->epoll_fd, ev);
}
else //send失敗
{
printf("Error in %s(), send: %sn", __func__, strerror(errno));
// 關閉、移除
close(ev->fd);
nty_event_del(reactor->epoll_fd, ev);
}
return rc;
}
二、Proactor模式
Proactor模式特點與Reactor不同,Proactor模式將所有的I/O操作都交給主線程和內核來處理,工作線程僅僅負責業務邏輯
Proactor模式的工作流程①主線程調用aio_read函數向內核注冊socket上讀完成事件,并告訴內核用戶讀緩沖區的位置,以及讀操作完成時如何通知應用程序(這里以信號為例)②主線程繼續處理其他邏輯③當socket上的數據被讀入用戶緩沖區后,內核將向應用程序發送一個信號,以通知應用程序數據已經可用④應用程序預先定義好的信號處理函數選擇一個工作線程來處理客戶請求。工作線程處理完客戶請求之后,調用aio_write函數向內核注冊socket上的寫完成事件,并告訴內核用戶寫緩沖區的位置,以及寫操作完成時如何通知應用程序(這里以信號為例)⑤主線程繼續處理其他邏輯⑥當用戶緩沖區的數據被寫入socket之后,內核將向應用程序發送一個信號,以通知應用程序數據已經發送完畢⑦應用程序預先定義好的信號處理函數選擇一個工作線程來做善后處理,比如決定是否關閉socket在上圖中,連接socket上的讀寫事件是通過aio_read/aio_write向內核注冊的,因此內核將通過信號來向應用程序報告連接socket上的讀寫事件。所以,主線程的epoll_wait調用僅能用來檢測監聽socket上的連接請求事件,而不能用來檢測連接socket的讀寫事件
三、使用同步I/O模擬Proactor模式
原理:主線程執行數據讀寫操作,讀寫完成之后,主線程向工作線程通知這一“完成事件”。那么從工作線程的角度來看,它們就直接獲得了數據讀寫的結果,接下來要做的只是對讀寫的結果進行邏輯處理
工作流程:①主線程往epoll內核事件表中注冊socket上的讀就緒事件②主線程調用epoll_wait等待socket上有數據可讀③當socket上有數據可讀時,epoll_wait通知主線程。主線程從socket循環讀取數據,直到沒有更多數據可讀,然后將讀取到的數據封裝成一個請求對象并插入請求隊列④睡眠在請求隊列上的某個工作線程被喚醒,它獲得請求對象并處理客戶請求,然后往epoll內核事件表中注冊socket上的寫就緒事件⑤主線程調用epoll_wait等到socket可寫⑥當socket可寫時,epoll_wait通知主線程。主線程往socket上寫入服務器處理客戶請求的結果
四、幾種開源庫
- 下面是幾種使用到上面技術的開源庫:libevent:名氣最大,應用最廣泛,歷史悠久的跨平臺事件庫libev:較 libevent 而言,設計更簡練,性能更好,但對 windows 支持不夠好;libuv:開發 node 的過程中需要一個跨平臺的事件庫,他們首選了 libev,但又要支持 Windows,故重新封裝了一套,linux 下用 libev 實現,Windows 下用 IOCP 實現
優先級libevent:激活的事件組織在優先級隊列中,各類事件默認的優先級是相同的,可以通過設置 事件的優先級使其優先被處理libev:也是通過優先級隊列來管理激活的時間,也可以設置事件優先級libuv:也是通過優先級隊列來管理激活的時間,也可以設置事件優先級
事件循環libevent:event_base 用于管理事件libev:激活的事件組織在優先級隊列中,各類事件默認的優先級是相同的,libuv:可以通 過設置事件的優先級 使其優先被處理
線程安全event_base 和 loop 都不是線程安全的,一個 event_base 或 loop 實例只能在用戶的一個線程 內訪問(一般是主線程),注冊到 event_base 或者 loop 的 event 都是串行訪問的,即每個執 行過程中,會按照優先級順序訪問已經激活的事件,執行其回調函數。所以在僅使用一個 event_base 或 loop 的情況下,回調函數的執行不存在并行關系