日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

有一次去面試服務器端崗位,面試官問我有一個連接過來,你該怎么編程處理它。我答道:“主線程收到請求后,創建一個子線程處理。” 面試官接著問,那如果有一千個連接同時來呢?我說“那就多創建一點線程,搞個線程池”。面試官繼續追問如果一萬個呢?我答道:“......不會...”。

事實上,服務器端只需要單線程可以達到非常高的處理能力,redis 就是一個非常好的例子。僅僅靠單線程就可以支撐起每秒數萬 QPS 的高處理能力。今天我們就來帶大家看看 Redis 核心網絡模塊的內部實現,學習下 Redis 是如何做到如此的高性能的!

一、理解多路復用原理

在開始介紹 Redis 之前,我想有必要先來簡單介紹下 epoll。

在傳統的同步阻塞網絡編程模型里(沒有協程以前),性能上不來的根本原因在于進程線程都是笨重的家伙。讓一個進(線)程只處理一個用戶請求確確實實是有點浪費了。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

先拋開高內存開銷不說,在海量的網絡請求到來的時候,光是頻繁的進程線程上下文就讓 CPU 疲于奔命了。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

如果把進程比作牧羊人,一個進(線)程同時只能處理一個用戶請求,相當于一個人只能看一只羊,放完這一只才能放下一只。如果同時來了 1000 只羊,那就得 1000 個人去放,這人力成本是非常高的。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

性能提升思路很簡單,就是讓很多的用戶連接來復用同一個進(線)程,這就是多路復用多路指的是許許多多個用戶的網絡連接。復用指的是對進(線)程的復用。換到牧羊人的例子里,就是一群羊只要一個牧羊人來處理就行了。

不過復用實現起來是需要特殊的 socket 事件管理機制的,最典型和高效的方案就是 epoll。放到牧羊人的例子來,epoll 就相當于一只牧羊犬。

在 epoll 的系列函數里, epoll_create 用于創建一個 epoll 對象,epoll_ctl 用來給 epoll 對象添加或者刪除一個 socket。epoll_wait 就是查看它當前管理的這些 socket 上有沒有可讀可寫事件發生。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

當網卡上收到數據包后,linux 內核進行一系列的處理后把數據放到 socket 的接收隊列。然后會檢查是否有 epoll 在管理它,如果是則在 epoll 的就緒隊列中插入一個元素。epoll_wait 的操作就非常的簡單了,就是到 epoll 的就緒隊列上來查詢有沒有事件發生就行了。

在基于 epoll 的編程中,和傳統的函數調用思路不同的是,我們并不能主動調用某個 API 來處理。因為無法知道我們想要處理的事件啥時候發生。所以只好提前把想要處理的事件的處理函數注冊到一個事件分發器上去。當事件發生的時候,由這個事件分發器調用回調函數進行處理。這類基于實現注冊事件分發器的開發模式也叫 Reactor 模型。

【文章福利】:小編整理了一些個人覺得比較好的學習書籍、視頻資料共享在qun文件里面,有需要的可以自行添加哦!832218493(需要自取)

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

二、Redis 服務啟動初始化

理解了 epoll 原理后,我們再來實際看 Redis 具體是如何使用 epoll 的。直接在 Github 上就可以非常方便地獲取 Redis 的源碼。我們切到 5.0.0 版本來看單線程版本的實現(多線程我們改天再講)。

# git clone https://github.com/redis/redis
# cd redis
# git checkout -b 5.0.0 5.0.0

其中整個 Redis 服務的代碼總入口在 src/server.c 文件中,我把入口函數的核心部分摘了出來,如下。

//file: src/server.c
int main(int argc, char **argv) {
    ......
    // 啟動初始化
    initServer();
    // 運行事件處理循環,一直到服務器關閉為止
    aeMain(server.el);
}

其實整個 Redis 的工作過程,就只需要理解清楚 main 函數中調用的 initServer 和 aeMain 這兩個函數就足夠了。

本節中我們重點介紹 initServer,在下一節介紹事件處理循環 aeMain。在 initServer 這個函數內,Redis 做了這么三件重要的事情。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

  • 創建一個 epoll 對象
  • 對配置的監聽端口進行 listen
  • 把 listen socket 讓 epoll 給管理起來
//file: src/server.c
void initServer() {
    // 2.1.1 創建 epoll
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    // 2.1.2 綁定監聽服務端口
    listenToPort(server.port,server.ipfd,&server.ipfd_count);

    // 2.1.3 注冊 accept 事件處理器
    for (j = 0; j < server.ipfd_count; j++) {
        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL);
    }
    ...
}

接下來我們分別來看。

2.1 創建 epoll 對象

本小節的邏輯看起來貌似不短,但其實只是創建了一個 epoll 對象出來而已。

創建 epoll 對象的邏輯在 aeCreateEventLoop 中,在創建完后,Redis 將其保存在 redisServer 的 aeEventLoop 成員中,以備后續使用。

struct redisServer {
    ...
    aeEventLoop *el;
}

我們來看 aeCreateEventLoop 詳細邏輯。Redis 在操作系統提供的 epoll 對象基礎上又封裝了一個 eventLoop 出來,所以創建的時候是先申請和創建 eventLoop。

//file:src/ae.c
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    eventLoop = zmalloc(sizeof(*eventLoop);

    //將來的各種回調事件就都會存在這里
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    ......

    aeApiCreate(eventLoop);
    return eventLoop;
}

在 eventLoop 里,我們稍微注意一下 eventLoop->events,將來在各種事件注冊的時候都會保存到這個數組里。

//file:src/ae.h
typedef struct aeEventLoop {
    ......
    aeFileEvent *events; /* Registered events */
}

具體創建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這里,真正調用了 epoll_create

//file:src/ae_epoll.c
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    state->epfd = epoll_create(1024); 
    eventLoop->apidata = state;
    return 0;
}

2.2 綁定監聽服務端口

我們再來看 Redis 中的 listen 過程,它在 listenToPort 函數中。雖然調用鏈條很長,但其實主要就是執行了個簡單 listen 而已。

//file: src/redis.c
int listenToPort(int port, int *fds, int *count) {
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        fds[*count] = anetTcpServer(server.neterr,port,NULL,
                server.tcp_backlog);
    }
}

Redis 是支持開啟多個端口的,所以在 listenToPort 中我們看到是啟用一個循環來調用 anetTcpServer。在 anetTcpServer 中,逐步會展開調用,直到執行到 bind 和 listen 系統調用。

//file:src/anet.c
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
static int _anetTcpServer(......)
{
    // 設置端口重用
    anetSetReuseAddr(err,s)
    // 監聽
    anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog)
}
static int anetListen(......) {
    bind(s,sa,len);
    listen(s, backlog);
    ......
}

2.3 注冊事件回調函數

我們回頭再看一下 initServer,它調用 aeCreateEventLoop 創建了 epoll,調用 listenToPort 進行了服務端口的 bind 和 listen。接著就開始調用 aeCreateFileEvent 來注冊一個 accept 事件處理器。

//file: src/server.c
void initServer() {
    // 2.1.1 創建 epoll
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    // 2.1.2 監聽服務端口
    listenToPort(server.port,server.ipfd,&server.ipfd_count);

    // 2.1.3 注冊 accept 事件處理器
    for (j = 0; j < server.ipfd_count; j++) {
        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL);
    }
    ...
}

我們來注意看調用 aeCreateFileEvent 時傳的重要參數是 acceptTcpHandler,它表示將來在 listen socket 上有新用戶連接到達的時候,該函數將被調用執行。我們來看 aeCreateFileEvent 具體代碼。

//file: src/ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // 取出一個文件事件結構
    aeFileEvent *fe = &eventLoop->events[fd];

    // 監聽指定 fd 的指定事件
    aeApiAddEvent(eventLoop, fd, mask);

    // 設置文件事件類型,以及事件的處理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有數據
    fe->clientData = clientData;
}

函數 aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個 aeFileEvent 對象。在 2.1 中我們介紹過 eventLoop->events 數組,注冊的各種事件處理器會保存在這個地方。

接下來調用 aeApiAddEvent。這個函數其實就是對 epoll_ctl 的一個封裝。主要就是實際執行 epoll_ctl EPOLL_CTL_ADD。

//file:src/ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    // add or mod
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    ......

    // epoll_ctl 添加事件
    epoll_ctl(state->epfd,op,fd,&ee);
    return 0;
}

每一個 eventLoop->events 元素都指向一個 aeFileEvent 對象。在這個對象上,設置了三個關鍵東西

  • rfileProc:讀事件回調
  • wfileProc:寫事件回調
  • clientData:一些額外的擴展數據

將來 當 epoll_wait 發現某個 fd 上有事件發生的時候,這樣 redis 首先根據 fd 到 eventLoop->events 中查找 aeFileEvent 對象,然后再看 rfileProc、wfileProc 就可以找到讀、寫回調處理函數。

回頭看 initServer 調用 aeCreateFileEvent 時傳參來看。

//file: src/server.c
void initServer() {
    ......
    // 2.1.3 注冊 accept 事件處理器
    for (j = 0; j < server.ipfd_count; j++) {
        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL);
    }
}

listen fd 對應的讀回調函數 rfileProc 事實上就被設置成了 acceptTcpHandler,寫回調沒有設置,私有數據 client_data 也為 null。

三、Redis 事件處理循環

在上一節介紹完了 Redis 的啟動初始化過程,創建了 epoll,也進行了綁定監聽,也注冊了 accept 事件處理函數為 acceptTcpHandler。

//file: src/server.c
int main(int argc, char **argv) {
    ......
    // 啟動初始化
    initServer();
    // 運行事件處理循環,一直到服務器關閉為止
    aeMain(server.el);
}

接下來,Redis 就會進入 aeMain 開始進行真正的用戶請求處理了。在 aeMain 函數中,是一個無休止的循環。在每一次的循環中,要做如下幾件事情。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

  • 通過 epoll_wait 發現 listen socket 以及其它連接上的可讀、可寫事件
  • 若發現 listen socket 上有新連接到達,則接收新連接,并追加到 epoll 中進行管理
  • 若發現其它 socket 上有命令請求到達,則讀取和處理命令,把命令結果寫到緩存中,加入寫任務隊列
  • 每一次進入 epoll_wait 前都調用 beforesleep 來將寫任務隊列中的數據實際進行發送
  • 如若有首次未發送完畢的,當寫事件發生時繼續發送
//file:src/ae.c
void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;
    while (!eventLoop->stop) {

        // 如果有需要在事件處理前執行的函數,那么運行它
        // 3.4 beforesleep 處理寫任務隊列并實際發送之
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 開始等待事件并處理
        // 3.1 epoll_wait 發現事件
        // 3.2 處理新連接請求
        // 3.3 處理客戶連接上的可讀事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

以上就是 aeMain 函數的核心邏輯所在,接下來我們分別對如上提到的四件事情進行詳細的闡述。

3.1 epoll_wait 發現事件

Redis 不管有多少個用戶連接,都是通過 epoll_wait 來統一發現和管理其上的可讀(包括 liisten socket 上的 accept事件)、可寫事件的。甚至連 timer,也都是交給 epoll_wait 來統一管理的。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

每當 epoll_wait 發現特定的事件發生的時候,就會調用相應的事先注冊好的事件處理函數進行處理。我們來詳細看 aeProcessEvents 對 epoll_wait 的封裝。

//file:src/ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    // 獲取最近的時間事件
    tvp = xxx

    // 處理文件事件,阻塞時間由 tvp 決定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
        // 從已就緒數組中獲取事件
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

        //如果是讀事件,并且有讀回調函數
        fe->rfileProc()

        //如果是寫事件,并且有寫回調函數
        fe->wfileProc()
    }
}

//file: src/ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    // 等待事件
    aeApiState *state = eventLoop->apidata;
    epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    ...
}

aeProcessEvents 就是調用 epoll_wait 來發現事件。當發現有某個 fd 上事件發生以后,則調為其事先注冊的事件處理器函數 rfileProc 和 wfileProc。

3.2 處理新連接請求

我們假設現在有新用戶連接到達了。前面在我們看到 listen socket 上的 rfileProc 注冊的是 acceptTcpHandler。也就是說,如果有連接到達的時候,會回調到 acceptTcpHandler。

在 acceptTcpHandler 中,主要做了幾件事情

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

  • 調用 accept 系統調用把用戶連接給接收回來
  • 為這個新連接創建一個唯一 redisClient 對象
  • 將這個新連接添加到 epoll,并注冊一個讀事件處理函數

接下來讓我們看上面這三件事情都分別是如何被處理的。

//file:src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, ...) {
    cfd = anetTcpAccept(server.neterr, fd, cip, ...);
    acceptCommonHandler(cfd,0);
}

在 anetTcpAccept 中執行非常的簡單,就是調用 accept 把連接接收回來。

//file: src/anet.c
int anetTcpAccept(......) {
    anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)
}
static int anetGenericAccept(......) {
    fd = accept(s,sa,len)
}

接下來在 acceptCommonHandler 為這個新的客戶端連接 socket,創建一個 redisClient 對象。

//file: src/networking.c
static void acceptCommonHandler(int fd, int flags) {
    // 創建 redisClient 對象
    redisClient *c;
    c = createClient(fd);
    ......
}

在 createClient 中,創建 client 對象,并且為該用戶連接注冊了讀事件處理器。

//file:src/networking.c
redisClient *createClient(int fd) {

    // 為用戶連接創建 client 對象
    redisClient *c = zmalloc(sizeof(redisClient));

    if (fd != -1) {
        ...

        // 為用戶連接注冊讀事件處理器
        aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c)
    }
    ...
}

關于 aeCreateFileEvent 的處理過程這里就不贅述了,詳情參見 2.3 節。其效果就是將該用戶連接 socket fd 對應的讀處理函數設置為 readQueryFromClient, 并且設置私有數據為 redisClient c。

3.3 處理客戶連接上的可讀事件

現在假設該用戶連接有命令到達了,就假設用戶發送了GET XXXXXX_KEY 命令。那么在 Redis 的時間循環中調用 epoll_wait 發現該連接上有讀時間后,會調用在上一節中討論的為其注冊的讀處理函數 readQueryFromClient。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

在讀處理函數 readQueryFromClient 中主要做了這么幾件事情。

  • 解析并查找命令
  • 調用命令處理
  • 添加寫任務到隊列
  • 將輸出寫到緩存等待發送

我們來詳細地看 readQueryFromClient 的代碼。在 readQueryFromClient 中會調用 processInputBuffer,然后進入 processCommand 對命令進行處理。其調用鏈如下:

//file: src/networking.c
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, ...) {
    redisClient *c = (redisClient*) privdata;
    processInputBufferAndReplicate(c);
}

void processInputBufferAndReplicate(client *c) {
    ...
    processInputBuffer(c);
}

// 處理客戶端輸入的命令內容
void processInputBuffer(redisClient *c) {
    // 執行命令,
    processCommand(c);
}

我們再來詳細看 processCommand 。

//file:
int processCommand(redisClient *c) { 
    // 查找命令,并進行命令合法性檢查,以及命令參數個數檢查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

    ......

    // 處理命令
    // 如果是 MULTI 事務,則入隊,否則調用 call 直接處理
    if (c->flags & CLIENT_MULTI && ...)
    {
        queueMultiCommand(c);
    } else {
        call(c,CMD_CALL_FULL);
        ...
    }
    return C_OK;
}

我們先忽略 queueMultiCommand,直接看核心命令處理方法 call。

//file:src/server.c
void call(client *c, int flags) {
    // 查找處理命令,
    struct redisCommand *real_cmd = c->cmd;
    // 調用命令處理函數
    c->cmd->proc(c);
    ......
}

在 server.c 中定義了每一個命令對應的處理函數

//file:src/server.c
struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    ......

    {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    ......
}

對于 get 命令來說,其對應的命令處理函數就是 getCommand。也就是說當處理 GET 命令執行到 c->cmd->proc 的時候會進入到 getCommand 函數中來。

//file: src/t_string.c
void getCommand(client *c) {
    getGenericCommand(c);
}
int getGenericCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
        return C_OK;
    ...
    addReplyBulk(c,o);
    return C_OK;
}

getGenericCommand 方法會調用 lookupKeyReadOrReply 來從內存中查找對應的 key值。如果找不到,則直接返回 C_OK;如果找到了,調用 addReplyBulk 方法將值添加到輸出緩沖區中。

//file: src/networking.c
void addReplyBulk(client *c, robj *obj) {
    addReplyBulkLen(c,obj);
    addReply(c,obj);
    addReply(c,shared.crlf);
}

其主題是調用 addReply 來設置回復數據。在 addReply 方法中做了兩件事情:

  • prepareClientToWrite 判斷是否需要返回數據,并且將當前 client 添加到等待寫返回數據隊列中。
  • 調用 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩沖區中,等待寫入 socekt
//file:src/networking.c
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else {
        ......        
    }
}

先來看 prepareClientToWrite 的詳細實現,

//file: src/networking.c
int prepareClientToWrite(client *c) {
    ......
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
        clientInstallWriteHandler(c);
}

//file:src/networking.c
void clientInstallWriteHandler(client *c) {
    c->flags |= CLIENT_PENDING_WRITE;
    listAddNodeHead(server.clients_pending_write,c);
}

其中 server.clients_pending_write 就是我們說的任務隊列,隊列中的每一個元素都是有待寫返回數據的 client 對象。在 prepareClientToWrite 函數中,把 client 添加到任務隊列 server.clients_pending_write 里就算完事。

接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續調用 _addReplyStringToList 往鏈表里寫。簡單起見,我們只看 _addReplyToBuffer 的代碼。

//file:src/networking.c
int _addReplyToBuffer(client *c, const char *s, size_t len) {
    ......
    // 拷貝到 client 對象的 Response buffer 中
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return C_OK;
}

3.4 beforesleep 處理寫任務隊列

回想在 aeMain 函數中,每次在進入 aeProcessEvents 前都需要先進行 beforesleep 處理。這個函數名字起的怪怪的,但實際上大有用處。

//file:src/ae.c
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // beforesleep 處理寫任務隊列并實際發送之
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

該函數處理了許多工作,其中一項便是遍歷發送任務隊列,并將 client 發送緩存區中的處理結果通過 write 發送到客戶端手中。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

我們來看下 beforeSleep 的實際源碼。

//file:src/server.c
void beforeSleep(struct aeEventLoop *eventLoop) {
    ......
    handleClientsWithPendingWrites();
}
//file:src/networking.c
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    //遍歷寫任務隊列 server.clients_pending_write
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        //實際將 client 中的結果數據發送出去
        writeToClient(c->fd,c,0)

        //如果一次發送不完則準備下一次發送
        if (clientHasPendingReplies(c)) {
            //注冊一個寫事件處理器,等待 epoll_wait 發現可寫后再處理 
            aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c);
        }
        ......
    }
}

在 handleClientsWithPendingWrites 中,遍歷了發送任務隊列 server.clients_pending_write,并調用 writeToClient 進行實際的發送處理。

值得注意的是,發送 write 并不總是能一次性發送完的。假如要發送的結果太大,而系統為每個 socket 設置的發送緩存區又是有限的。

在這種情況下,clientHasPendingReplies 判斷仍然有未發送完的數據的話,就需要注冊一個寫事件處理函數到 epoll 上。等待 epoll 發現該 socket 可寫的時候再次調用 sendReplyToClient進行發送。

//file:src/networking.c
int writeToClient(int fd, client *c, int handler_installed) {
    while(clientHasPendingReplies(c)) {
        // 先發送固定緩沖區
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            ......

        // 再發送回復鏈表中數據
        } else {
            o = listNodeValue(listFirst(c->reply));
            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
            ......
        }
    }
}

writeToClient 中的主要邏輯就是調用 write 系統調用讓內核幫其把數據發送出去即可。由于每個命令的處理結果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可變鏈表來儲存結果字符串。這里自然發送的時候就需要分別對固定緩存區和鏈表來進行發送了。

四、高性能 Redis 網絡原理總結

Redis 服務器端只需要單線程可以達到非常高的處理能力,每秒可以達到數萬 QPS 的高處理能力。如此高性能的程序其實就是對 Linux 提供的多路復用機制 epoll 的一個較為完美的運用而已。

在 Redis 源碼中,核心邏輯其實就是兩個,一個是 initServer 啟動服務,另外一個就是 aeMain 事件循環。把這兩個函數弄懂了,Redis 就吃透一大半了。

//file: src/server.c
int main(int argc, char **argv) {
    ......
    // 啟動初始化
    initServer();
    // 運行事件處理循環,一直到服務器關閉為止
    aeMain(server.el);
}

在 initServer 這個函數內,Redis 做了這么三件重要的事情。

  • 創建一個 epoll 對象
  • 對配置的監聽端口進行 listen
  • 把 listen socket 讓 epoll 給管理起來

在 aeMain 函數中,是一個無休止的循環,它是 Redis 中最重要的部分。在每一次的循環中,要做的事情可以總結為如下圖。

深度解析單線程的 Redis 如何做到每秒數萬 QPS 的超高處理能力

 

  • 通過 epoll_wait 發現 listen socket 以及其它連接上的可讀、可寫事件
  • 若發現 listen socket 上有新連接到達,則接收新連接,并追加到 epoll 中進行管理
  • 若發現其它 socket 上有命令請求到達,則讀取和處理命令,把命令結果寫到緩存中,加入寫任務隊列
  • 每一次進入 epoll_wait 前都調用 beforesleep 來將寫任務隊列中的數據實際進行發送

其實事件分發器還處理了一個不明顯的邏輯,那就是如果 beforesleep 在將結果寫回給客戶端的時候,如果由于內核 socket 發送緩存區過小而導致不能一次發送完畢的時候,也會注冊一個寫事件處理器。等到 epoll_wait 發現對應的 socket 可寫的時候,再執行 write 寫處理。

整個 Redis 的網絡核心模塊就在咱們這一篇文章中都敘述透了(剩下的 Redis 就是對各種數據結構的建立和處理了)。相信吃透這一篇對于你對網絡編程的理解會有極大的幫助!

還等什么,快把這篇文章也分享給你身邊和你一樣愛好深度技術的好友吧!

分享到:
標簽:Redis
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定