日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

前言

  • 同步I/O模型通常用于實現(xiàn)Reactor模式
  • 異步I/O模型則用于實現(xiàn)Proactor模式
  • 最后我們會使用同步I/O方式模擬出Proactor模式

一、Reactor模式

  • Reactor 釋義“反應(yīng)堆”,是一種事件驅(qū)動機(jī)制
  • Reactor的回調(diào)函數(shù):和普通函數(shù)調(diào)用的不同之處在于,應(yīng)用程序不是主動的調(diào)用某個 API 完成處理,而是恰恰 相反,Reactor 逆置了事件處理流程,應(yīng)用程序需要提供相應(yīng)的接口并注冊到 Reactor 上, 如果相應(yīng)的時間發(fā)生,Reactor 將主動調(diào)用應(yīng)用程序注冊的接口,這些接口又稱為“回調(diào)函數(shù)”
Linux兩種處理模式reactor模式proactor模式

 

  • Reactor 模式是處理并發(fā)I/O比較常見的一種模式,用于同步 I/O,中心思想是將所有要處理的I/O 事件注冊到一個中心I/O多路復(fù)用器上,同時主線程/進(jìn)程阻塞在多路復(fù)用器上; 一旦有 I/O 事件到來或是準(zhǔn)備就緒(文件描述符或 socket 可讀、寫),多路復(fù)用器返回并將事先注冊的相應(yīng) I/O 事件分發(fā)到對應(yīng)的處理器中。
  • Reactor 模型有三個重要的組件:多路復(fù)用器:由操作系統(tǒng)提供,在 linux 上一般是 select, poll, epoll 等系統(tǒng)調(diào)用。事件分發(fā)器:將多路復(fù)用器中返回的就緒事件分到對應(yīng)的處理函數(shù)中事件處理器:負(fù)責(zé)處理特定事件的處理函數(shù)
  • 具體流程如下:注冊讀就緒事件和相應(yīng)的事件處理器事件分離器等待事件事件到來,激活分離器,分離器調(diào)用事件對應(yīng)的處理器事件處理器完成實際的讀操作,處理讀到的數(shù)據(jù),注冊新的事件,然后返還控制 權(quán)
Linux兩種處理模式reactor模式proactor模式

 


Linux兩種處理模式reactor模式proactor模式

 

需要C/C++ Linux服務(wù)器架構(gòu)師學(xué)習(xí)資料后臺私信“1”免費獲取(資料包括C/C++,Linux,golang技術(shù),Nginx,ZeroMQ,MySQL,redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK,ffmpeg等),免費分享

多線程Reactor模式多線程Reactor模式特點:它要求主線程(I/O處理單元)只負(fù)責(zé)監(jiān)聽文件描述符上是否有事件發(fā)生,有的話就立即將時間通知工作線程(邏輯單元)。除此之外,主線程不做任何其他實質(zhì)性的工作讀寫數(shù)據(jù),接受新的連接,以及處理客戶請求均在工作線程中完成工作流程:①主線程往epoll內(nèi)核事件表中注冊socket上有數(shù)據(jù)可讀②主線程調(diào)用epoll_wait等待socket上有數(shù)據(jù)可讀③當(dāng)socket上有數(shù)據(jù)可讀時,epoll_wait通知主線程。主線程則將socket可讀事件放入請求隊列④睡眠在請求請求隊列上的某個工作線程被喚醒,它從socket讀取數(shù)據(jù),并處理客戶請求,然后往epoll內(nèi)核事件表中注冊該socket上的寫就緒時間⑤主線程調(diào)用epoll_wait等到socket可寫⑥當(dāng)socket可寫時,epoll_wait通知主線程。主線程將socket可寫事件放入請求隊列⑦睡眠在請求隊列上的某個工作線程被喚醒,它向socket上寫入服務(wù)器處理客戶請求的結(jié)果

單線程Reactor模式單線程Reactor模式與多線程Reactor模式原理相同。但是工作都是在同一個線程中完成的單線程優(yōu)缺點:優(yōu)點:Reactor模型開發(fā)效率上比起直接使用IO復(fù)用要高,它通常是單線程的,設(shè)計目標(biāo)是希望單線程使用一顆 CPU 的全部資源。優(yōu)點為每個事件處理中很多時候可以 不考慮共享資源的互斥訪問缺點:可是缺點也是明顯的,現(xiàn)在的硬件發(fā)展,已經(jīng)不再遵循摩爾定 律,CPU 的頻率受制于材料的限制不再有大的提升,而改為是從核數(shù)的增加上提升能力單線程Reactor使用多核:如果程序業(yè)務(wù)很簡單,例如只是簡單的訪問一些提供了并發(fā)訪問的服務(wù),就可以直接開啟多個反應(yīng)堆(Reactor),每個反應(yīng)堆對應(yīng)一顆CPU核心這些反應(yīng)堆上跑的請求互不相關(guān),這是完全可以利用多核的。例如Nginx這樣的http靜態(tài)服務(wù)器下面是單線程Reactor模式的實現(xiàn)代碼,下載下來之后可以直接編譯運行:

 

// reactor.c

// 源碼鏈接: https://github.com/dongyusheng/csdn-code/blob/master/server-client/reactor.c

// gcc -o reactor reactor.c

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <string.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <sys/socket.h>

#include <sys/types.h>

#include <sys/epoll.h>

#include <errno.h>

#include <time.h>

#include <libgen.h>

#include <fcntl.h>

 

#define MAX_EPOLL_EVENTS    1024

#define MAX_BUFFER_SIZE     4096

 

typedef int NCALLBACK(int, int, void*);

 

// 事件結(jié)構(gòu)體, 每個套接字都會被封裝為一個事件

struct ntyevent {

 int fd;           // 事件對應(yīng)的fd

 int events;       // 事件類型(  本代碼中我們只處理EPOLL_IN和EPOLL_OUT)

 

 void *arg;        // 事件回調(diào)函數(shù)的參數(shù)3, 實際傳入的是一個struct ntyreactor結(jié)構(gòu)體指針

 int (*callback)(int fd, int events, void *arg); //事件回調(diào)函數(shù)

 

 int status;       // 當(dāng)前事件是否位于epoll集合中: 1表示在, 0表示不在

 

 char buffer[MAX_BUFFER_SIZE]; // 讀寫緩沖區(qū)

 int length;       //緩沖區(qū)數(shù)據(jù)的長度

 

 long last_active; // 最后一次活躍的時間

};

 

 

// Reactor主體

struct ntyreactor {

 int epoll_fd;             // epoll套接字

 struct ntyevent *events; // reactor當(dāng)前處理的事件集

};

 

// 創(chuàng)建一個Tcp Server

int init_server(char *ip, short port);

// 向reactor中添加一個服務(wù)器監(jiān)聽事件

int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback);

 

 

/***下面這3個函數(shù)是用來對reactor操作的***/

// 初始化reactor

struct ntyreactor *ntyreactor_init();

// 銷毀reactor

int ntyreactor_destroy(struct ntyreactor *reactor);

// reactor運行函數(shù)

int ntyreactor_run(struct ntyreactor *reactor);

 

 

 

/***下面這3個函數(shù)是用來對ntyevent事件結(jié)構(gòu)操作的***/

// 將一個fd封裝到事件結(jié)構(gòu)中

int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);

// 將一個事件添加/更新到epoll的事件表中

int nty_event_add(int epoll_fd, struct ntyevent* ev);

// 將一個事件移出epoll事件表

int nty_event_del(int epoll_fd, struct ntyevent* event);

 

 

/***下面這3個函數(shù)是ntyevent事件可以使用的回調(diào)函數(shù)***/

int accept_callback(int fd, int events, void *arg);

int recv_callback(int fd, int events, void *arg);

int send_callback(int fd, int events, void *arg);

 

 

 

int main(int argc, char *argv[])

{

 if(argc != 3)

    {

 printf("usage: ./%s [ip] [port]n", basename(argv[0]));

 exit(EXIT_FAILURE);

    }

 

 char *ip = argv[1];

 short port = atoi(argv[2]);

 

 int sock_fd;

 

 // 1.初始化一個Tcp Server

    sock_fd = init_server(ip, port);

 

 // 2.初始化reactor

 struct ntyreactor *reactor = ntyreactor_init();

 if( reactor == NULL)

    {

 printf("Error in %s(), ntyreactor_init: create reactor errorn", __func__);

 exit(EXIT_FAILURE);

    }

 

 // 3.將Tcp Server添加到reactor事件集中

    ntyreactor_addlistener(reactor, sock_fd, accept_callback);

 

 // 4.運行reactor

    ntyreactor_run(reactor);

 

 // 5.銷毀

    ntyreactor_destroy(reactor);

 

    close(sock_fd);

 

 return 0;

}

 

int init_server(char *ip, short port)

{

 // 1.創(chuàng)建套接字

 int sock_fd = socket(AF_INET, SOCK_STREAM, 0);

 if(sock_fd == -1)

    {

 printf("Error in %s(), socket: %sn", __func__, strerror(errno));

 return -1;

    }

 

 // 2.初始化服務(wù)器地址

 struct sockaddr_in server_addr;

 memset(&server_addr, 0, sizeof(server_addr));

    server_addr.sin_family = AF_INET;

 if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1)

    {

 printf("Error in %s(), inet_pton: %sn", __func__, strerror(errno));

 return -1;

    }

    server_addr.sin_port = htons(port);

 

 // 3.綁定服務(wù)器地址

 if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1)

    {

 printf("Error in %s(), bind: %sn", __func__, strerror(errno));

 return -1;

    }

 

 // 3.監(jiān)聽

 if(listen(sock_fd, 20) == -1)

    {

 printf("Error in %s(), listen: %sn", __func__, strerror(errno));

 return -1;

    }

 

 printf("Listen start [%s:%d]...n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));

 

 return sock_fd;

}

 

struct ntyreactor *ntyreactor_init()

{

 // 1.創(chuàng)建一個reactor

 struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));

 if(reactor == NULL)

 return NULL;

 memset(reactor, 0, sizeof(struct ntyreactor));

 

 // 2.創(chuàng)建reacotr的epoll_fd

    reactor->epoll_fd = epoll_create(1);

 if(reactor->epoll_fd == -1)

    {

 printf("Error in %s(), epoll_create: %sn", __func__, strerror(errno));

 free(reactor);

 return NULL;

    }

 

 // 3.創(chuàng)建reactor的事件集

    reactor->events = (struct ntyevent*)malloc(sizeof(struct ntyevent) * MAX_EPOLL_EVENTS);

 if(reactor->events == NULL)

    {

 printf("Error in %s(), malloc: %sn", __func__, strerror(errno));

        close(reactor->epoll_fd);

 free(reactor);

 return NULL;

    }

 

 return reactor;

}

 

int ntyreactor_destroy(struct ntyreactor *reactor)

{

 if(reactor == NULL)

    {

 printf("Error in %s(): %sn", __func__, "reactor arg is NULL");

 return -1;

    }

 

 // 關(guān)閉epoll_fd、銷毀事件集、釋放結(jié)構(gòu)

    close(reactor->epoll_fd);

 free(reactor->events);

 

 free(reactor);

 

 return 0;

}

 

int ntyreactor_run(struct ntyreactor *reactor)

{

 // 1.判斷參數(shù)

 if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL)

    {

 printf("Error in %s(): %sn", __func__, "reactor arg is error");

 return -1;

    }

 

 

 struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];

 

 // 2.進(jìn)行epoll_wait()

 int nready;

 while(1)

    {

 // 超時檢測

 /*

        int checkpos = 0, i;

        long now = time(NULL);

		for (i = 0; i < MAX_EPOLL_EVENTS; i++, checkpos ++) {

			if (checkpos == MAX_EPOLL_EVENTS) {

				checkpos = 0;

			}

            // 如果當(dāng)前索引處的事件status為0, 則不檢測, 進(jìn)行下一個

			if (reactor->events[checkpos].status != 1) {

				continue;

			}



            // 如果超過60秒, 那么就認(rèn)定為超時, 超時后關(guān)閉移除

			long duration = now - reactor->events[checkpos].last_active;

			if (duration >= 60) {

				close(reactor->events[checkpos].fd);

				printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);

				nty_event_del(reactor->epfd, &reactor->events[checkpos]);

			}

		}*/

 

        nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000);

 // 3.函數(shù)出錯

 if(nready == -1)

        {

 // 如果函數(shù)在阻塞過程中接收到信號, 那么繼續(xù)進(jìn)行epoll_wait()

 if(errno == EAGAIN || errno == EWOULDBLOCK)

 continue;

 printf("Error in %s(), epoll_wait: %sn", __func__, strerror(errno));

 return -1;

        }

 // 4.函數(shù)超時

 else if(nready == 0)

 continue;

 // 5.有事件準(zhǔn)備好

 else

        {

 // 遍歷處理已就緒的事件

 int i;

 for(i = 0; i < nready; ++i)

            {

 // 獲取事件結(jié)構(gòu)體, 保存在struct epoll_event結(jié)構(gòu)的data.ptr中

 struct ntyevent* ev = (struct ntyevent*)ep_events[i].data.ptr;

 

 // 如果事件可讀

 if((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN))

                    ev->callback(ev->fd, ev->events, ev->arg);

 

 // 如果事件可寫

 if((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))

                    ev->callback(ev->fd, ev->events, ev->arg);

            }

        }

    }

 

 return 0;

}

 

int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback)

{

 if(reactor == NULL || fd <0 || callback == NULL)

    {

 printf("Error in %s(): %sn", __func__, "arg error");

 return -1;

    }

 

 // 初始化ntyevent事件結(jié)構(gòu), 然后添加到reactor的epoll事件表中即可

    nty_event_set(&reactor->events[fd], fd, EPOLLIN, 0, 0, callback, reactor);

    nty_event_add(reactor->epoll_fd, &reactor->events[fd]);

 

 return 0;

}

 

int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg)

{

 if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0)

    {

 printf("Error in %s(): %sn", __func__, "arg error");

 return -1;

    }

 

 // 初始化ntyevent結(jié)構(gòu)的相關(guān)內(nèi)容即可

    ev->fd = fd;

    ev->events = event;

    ev->arg = arg;

    ev->callback = callback;

    ev->status = status;

    ev->length = length;

    ev->last_active = time(NULL);

 

 return 0;

}

 

int nty_event_add(int epoll_fd, struct ntyevent* ev)

{

 if(epoll_fd <0 || ev == NULL)

    {

 printf("Error in %s(): %sn", __func__, "arg error");

 return -1;

    }

 

 // 1.創(chuàng)建一個epoll事件結(jié)構(gòu)

 struct epoll_event ep_event;

 memset(&ep_event, 0, sizeof(ep_event));

    ep_event.events = ev->events;

    ep_event.data.ptr = ev;

 //ep_event.data.fd = ev->fd; data成員是一個聯(lián)合體, 不能同時使用fd和ptr成員

 

 // 2.如果當(dāng)前ev已經(jīng)在epoll事件表中, 那么就修改; 否則就把ev加入到epoll事件表中

 int op;

 if(ev->status == 0)

    {

        op = EPOLL_CTL_ADD;

        ev->status = 1;

    } 

 else

        op = EPOLL_CTL_MOD;

 

 // 3.添加/更新 

 if(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) == -1)

    {

 printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno));

 return -1;

    }

 

 return 0;

}

 

int nty_event_del(int epoll_fd, struct ntyevent* ev)

{

 if(epoll_fd < 0 || ev == NULL || ev->status != 1)

    {

 printf("Error in %s(): %sn", __func__, "ev arg is error");

 return -1;

    }

 

 // 初始要刪除的epoll事件結(jié)構(gòu)

 struct epoll_event ep_event;

 memset(&ep_event, 0, sizeof(ep_event));

    ep_event.data.ptr = ev;

 //ep_event.data.fd = ev->fd; data成員是一個枚舉, 不能同時使用ptr和fd成員

    ev->status = 0;

 

 // 從epoll事件表中刪除epoll事件

 if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) == -1)

    {

 printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno));

 return -1;

    }

 

 return 0;

}

 

int accept_callback(int fd, int events, void *arg)

{

 // 1.獲得reactor結(jié)構(gòu)

 struct ntyreactor *reactor = (struct ntyreactor*)arg;

 // 2.獲取該fd對應(yīng)的事件結(jié)構(gòu)

 struct ntyevent *ev = reactor->events + fd;

 

 // 3.初始化客戶端地址結(jié)構(gòu)

 struct sockaddr_in cli_addr;

 memset(&cli_addr, 0 , sizeof(cli_addr));

 socklen_t len = sizeof(cli_addr);

 

 // 4.接收客戶端

 int cli_fd;

    cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len);

 if(cli_fd == -1)

    {

 printf("Error in %s(), accept: %sn", __func__, strerror(errno));

 return -1;

    }

 

 int i;

 do {

 // 5.在reactor事件表中找到第一個空位置, 用i表示新事件存放的位置, 也是其套接字的值

 // reactor->events的0、1、2、3、4都被占用了, 客戶端第一個可以使用的套接字為5, 因此此處從5開始遍歷

 for(i = 5; i< MAX_EPOLL_EVENTS; ++i)

        {

 if(reactor->events[i].status == 0)

 break;

        }

 

 // 6.如果滿了, 就退出

 if(i == MAX_EPOLL_EVENTS)

        {

 printf("Error in %s(): max connect limit[%d]n", __func__, MAX_EPOLL_EVENTS);

 return -1;

        }

 

 // 7.將套接字設(shè)置為非阻塞

 int flag = 0;

 if ((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK)) < 0) {

 printf("Error in %s(), fcntl: %sn", __func__, strerror(errno));

 return -1;

		}

 

 // 8.將新事件添加到reactor事件表中

 // 此處我們將新客戶端的回調(diào)函數(shù)首先設(shè)置為recv_callback, 事件類型為EPOLLIN, 因為一般都是客戶端向服務(wù)器發(fā)送數(shù)據(jù)的

        nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);

        nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]);

    } while(0);

 

 printf("New connect: [%s:%d], [time:%ld], pos[%d]n", 

        inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);

 

 return 0;

}

 

int recv_callback(int fd, int events, void *arg)

{

 // 1.獲得reactor結(jié)構(gòu)

 struct ntyreactor *reactor =(struct ntyreactor*)arg;

 // 2.獲取該fd對應(yīng)的事件結(jié)構(gòu)

 struct ntyevent *ev = reactor->events + fd;

 

 // 3.先將事件從epoll事件集移除

    nty_event_del(reactor->epoll_fd, ev);

 

 // 3.接收數(shù)據(jù)

 int rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE, 0);

 if(rc < 0)        //recv出錯

    {

 //if(errno == EAGAIN || errno == EWOULDBLOCK)

 //    return rc;

 

 printf("Error in %s(), recv: %sn", __func__, strerror(errno));

 

 // 此處我們不再需要將該nty_event從epoll事件集移除, 因為上面我們已經(jīng)移除了

        close(ev->fd);

    }

 else if(rc == 0)  //對方關(guān)閉了

    {

 printf("Client closed the connection, fd = %dn", ev->fd);

 

 // 此處我們也當(dāng)做錯誤處理

 // 此處我們不再需要將該nty_event從epoll事件集移除, 因為上面我們已經(jīng)移除了

        close(ev->fd);

    } 

 else //接收到數(shù)據(jù)

    {

        ev->buffer[rc] = '';

 printf("Recv[fd = %d]: %sn", ev->fd, ev->buffer);

 

 // 將事件變?yōu)榭勺x, 然后加入到epoll事件表中

        nty_event_set(ev, ev->fd, EPOLLOUT, rc, 0, send_callback, reactor);

        nty_event_add(reactor->epoll_fd, ev);

    }

 

 return rc;

}

 

int send_callback(int fd, int events, void *arg)

{

 // 1.獲得reactor結(jié)構(gòu)

 struct ntyreactor *reactor =(struct ntyreactor*)arg;

 // 2.獲取該fd對應(yīng)的事件結(jié)構(gòu)

 struct ntyevent *ev = reactor->events + fd;

 

 // 3.此處我們把接收的內(nèi)容再回送給對象, 因此使用的是ev->buffer

 int rc = send(ev->fd, ev->buffer, ev->length, 0);

 if(rc > 0) //send成功

    {

 printf("Send[fd = %d]: %sn", ev->fd, ev->buffer);

 

 // 移除、添加: 將其變?yōu)榭勺x

        nty_event_del(reactor->epoll_fd, ev);

        nty_event_set(ev, ev->fd, EPOLLIN, 0, 0, recv_callback, reactor);

        nty_event_add(reactor->epoll_fd, ev);

    }

 else //send失敗

    {

 printf("Error in %s(), send: %sn", __func__, strerror(errno));

 

 // 關(guān)閉、移除

        close(ev->fd);

        nty_event_del(reactor->epoll_fd, ev);

    }

 

 return rc;

}

二、Proactor模式

Proactor模式特點與Reactor不同,Proactor模式將所有的I/O操作都交給主線程和內(nèi)核來處理,工作線程僅僅負(fù)責(zé)業(yè)務(wù)邏輯

Proactor模式的工作流程①主線程調(diào)用aio_read函數(shù)向內(nèi)核注冊socket上讀完成事件,并告訴內(nèi)核用戶讀緩沖區(qū)的位置,以及讀操作完成時如何通知應(yīng)用程序(這里以信號為例)②主線程繼續(xù)處理其他邏輯③當(dāng)socket上的數(shù)據(jù)被讀入用戶緩沖區(qū)后,內(nèi)核將向應(yīng)用程序發(fā)送一個信號,以通知應(yīng)用程序數(shù)據(jù)已經(jīng)可用④應(yīng)用程序預(yù)先定義好的信號處理函數(shù)選擇一個工作線程來處理客戶請求。工作線程處理完客戶請求之后,調(diào)用aio_write函數(shù)向內(nèi)核注冊socket上的寫完成事件,并告訴內(nèi)核用戶寫緩沖區(qū)的位置,以及寫操作完成時如何通知應(yīng)用程序(這里以信號為例)⑤主線程繼續(xù)處理其他邏輯⑥當(dāng)用戶緩沖區(qū)的數(shù)據(jù)被寫入socket之后,內(nèi)核將向應(yīng)用程序發(fā)送一個信號,以通知應(yīng)用程序數(shù)據(jù)已經(jīng)發(fā)送完畢⑦應(yīng)用程序預(yù)先定義好的信號處理函數(shù)選擇一個工作線程來做善后處理,比如決定是否關(guān)閉socket在上圖中,連接socket上的讀寫事件是通過aio_read/aio_write向內(nèi)核注冊的,因此內(nèi)核將通過信號來向應(yīng)用程序報告連接socket上的讀寫事件。所以,主線程的epoll_wait調(diào)用僅能用來檢測監(jiān)聽socket上的連接請求事件,而不能用來檢測連接socket的讀寫事件

三、使用同步I/O模擬Proactor模式

原理:主線程執(zhí)行數(shù)據(jù)讀寫操作,讀寫完成之后,主線程向工作線程通知這一“完成事件”。那么從工作線程的角度來看,它們就直接獲得了數(shù)據(jù)讀寫的結(jié)果,接下來要做的只是對讀寫的結(jié)果進(jìn)行邏輯處理

工作流程:①主線程往epoll內(nèi)核事件表中注冊socket上的讀就緒事件②主線程調(diào)用epoll_wait等待socket上有數(shù)據(jù)可讀③當(dāng)socket上有數(shù)據(jù)可讀時,epoll_wait通知主線程。主線程從socket循環(huán)讀取數(shù)據(jù),直到?jīng)]有更多數(shù)據(jù)可讀,然后將讀取到的數(shù)據(jù)封裝成一個請求對象并插入請求隊列④睡眠在請求隊列上的某個工作線程被喚醒,它獲得請求對象并處理客戶請求,然后往epoll內(nèi)核事件表中注冊socket上的寫就緒事件⑤主線程調(diào)用epoll_wait等到socket可寫⑥當(dāng)socket可寫時,epoll_wait通知主線程。主線程往socket上寫入服務(wù)器處理客戶請求的結(jié)果

四、幾種開源庫

  • 下面是幾種使用到上面技術(shù)的開源庫:libevent:名氣最大,應(yīng)用最廣泛,歷史悠久的跨平臺事件庫libev:較 libevent 而言,設(shè)計更簡練,性能更好,但對 windows 支持不夠好;libuv:開發(fā) node 的過程中需要一個跨平臺的事件庫,他們首選了 libev,但又要支持 Windows,故重新封裝了一套,linux 下用 libev 實現(xiàn),Windows 下用 IOCP 實現(xiàn)

優(yōu)先級libevent:激活的事件組織在優(yōu)先級隊列中,各類事件默認(rèn)的優(yōu)先級是相同的,可以通過設(shè)置 事件的優(yōu)先級使其優(yōu)先被處理libev:也是通過優(yōu)先級隊列來管理激活的時間,也可以設(shè)置事件優(yōu)先級libuv:也是通過優(yōu)先級隊列來管理激活的時間,也可以設(shè)置事件優(yōu)先級

事件循環(huán)libevent:event_base 用于管理事件libev:激活的事件組織在優(yōu)先級隊列中,各類事件默認(rèn)的優(yōu)先級是相同的,libuv:可以通 過設(shè)置事件的優(yōu)先級 使其優(yōu)先被處理

線程安全event_base 和 loop 都不是線程安全的,一個 event_base 或 loop 實例只能在用戶的一個線程 內(nèi)訪問(一般是主線程),注冊到 event_base 或者 loop 的 event 都是串行訪問的,即每個執(zhí) 行過程中,會按照優(yōu)先級順序訪問已經(jīng)激活的事件,執(zhí)行其回調(diào)函數(shù)。所以在僅使用一個 event_base 或 loop 的情況下,回調(diào)函數(shù)的執(zhí)行不存在并行關(guān)系

分享到:
標(biāo)簽:reactor
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達(dá)人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定