日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

淺談linux下基于UDP服務的負載均衡方法

 

UDP在并入TCP/IP族的時候,就是作為IP協議的第四層抽象存在的,IP的協議單元叫做IP數據報,UDP的名字正是來源于此,用戶數據報協議,其中“用戶”便有了端到端的意思。起初UDP只是作為TCP的補充存在,應用于一些無需維護連接,無需維護狀態的場合,然而隨著TCP越來越復雜,隨著復雜性越來越往上層轉移,很多應用程序開始在UDP之上自行處理連接狀態,數據序列等,這樣便可以自行控制好復雜度,比較典型的是DTLS協議(SSL的UDP版本)以及OpenVPN。
OpenVPN原理上講不便于使用TCP作為承載協議,使用UDP將會是比較高效的選擇,然而這種高效卻被其單進程單處理給抵消掉了,特別是當你在8核CPU上運行OpenVPN服務,卻看到始終有7個CPU核心處于空閑狀態的時候。憤慨之余要著手解決問題。我的random redirect NAT+多OpenVPN進程綁定多端口方式已經可以作為一個解決方案,然而它還是利用了很多外圍的東西,最典型的就是它用ip_conntrack為UDP灌入連接的概念,因此它依賴于ip_conntrack!而我希望的是,可以像TCP那樣實現一個多處理,比如預分配worker進程的多處理,比如連接到來后再分配進程的多處理,使用UDP,可以做到嗎?

1.SO_REUSEADDR綁定相同的地址端口

REUSEADDR沒有太多復雜的東西,總之就是要確保四元組的唯一性即可,不管對于TCP還是對于UDP都一樣。對于TCP而言,bind的時候所有潛在的可能沖突(此時還不知道對端是誰)的綁定都會被禁止,對于connect的時候,此時本端對端均已經明確,只有明確不會破壞4元組唯一性的connect才會發送SYN包,對于Listen方,無需考慮4元組唯一性,因為connect方已經可以保證4元組唯一了。
UDP socket的4元組唯一性保證手段比較有趣,由于UDP無連接狀態,沒有辦法通過“將新的連接匹配保存下來的已經有的4元組連接”來判斷是否4元組沖突,那么辦呢?辦法就是按照固定的算法查找目標UDP socket,這樣每次查到的都是UDP socket鏈表(或者hash表,具體存儲形式無關緊要)固定位置的socket,這樣自然而然把其它位置沖突的UDP socket給廢掉了!你可以試著在一個進程或者多個進程內創建多個綁定相同端口,相同IP地址的UDP socket,然后在另外的機器向其發送數據,你會發現,只有最后一個創建的socket會接收到數據,其它的都是默默地等待,孤獨的等待,像小說《十年》里面的那樣...此時如果再創建一個綁定同樣地址端口的UDP socket,這最后一個便開始接收數據了。以上說的“最后一個”并不絕對,取決于算法,關鍵是“固定的位置”,這個UDP socket查找算法很簡單,只是基于目標IP地址和目標端口做查找,找到第一個則返回它,因此具體找到哪個取決于UDP socket在創建的時候是按何種順序插入的。
對于UDP socket的客戶端,也一樣,如果你創建了兩個綁定相同IP地址和相同端口的UDP socket,然后用第一個socket往一個特定的目標sendmsg,可能會是第二個socket接收到數據,到底是哪一個取決于UDP socket在第四層的查找算法。這個原理你可以看一下第四層的UDP socket查找邏輯,其核心在于compute_score函數:

if (inet->daddr) {
    if (inet->daddr != saddr)
        return -1;
    score += 2;
}
if (inet->dport) {
    if (inet->dport != sport)
        return -1;
    score += 2;
}

一般而言,UDP socket如果沒有調用connect,其daddr和dport是不會賦值的,因此關于這兩個字段的判斷相當于沒有結果的判斷,如果對于客戶端socket,且調用了connect,那么就會按照4元組完全匹配的原則來匹配。
因此,創建多個綁定相同IP地址,相同端口的UDP程序,會起到熱備份的作用,不會起到負載均衡的作用。另外如果新創建了同樣綁定的新UDP socket,會改變原有多個socket在鏈表(或者別的什么容器)中的位置,數據到來時,具體會匹配到哪個socket,依然取決于固定的算法。

2.SO_REUSEPORT的不同之處

linux 3.9內核支持了BSD系統早已有之的SO_REUSEPORT這個socket選項,有了它就可以支持UDP的負載均衡啦,哈哈!在給出具體做法和實例之前,我們先看一下它是如何做到的,和之前的版本一樣,在查找目標socket的時候,compute_score的邏輯幾乎沒有什么變化,不同的是,在compute_score之后,數據包的源IP和源端口也參與了運算,而不再像老版本那樣只是嚴格按照目標IP地址和目標端口那樣作為查找鍵參與運算,這樣即使按照目標IP和目標端口已經查找到了一個匹配的socket,也會按照其源IP和源端口繼續下去,以便在多個匹配的socket中選擇一個,算法是固定的,因此可以保證:
a.相同4元組的數據包總是匹配到相同的一個socket;
b.不同4元組的數據包可能會被hash到不同的socket。
以上的沒有什么神奇之處,其實現和之前的非負載均衡版本唯一不同的是就是在查找目標socket的時候讓源IP地址和源端口也參與了進來,而這并不破壞4元組的唯一性,所謂的唯一性,其目的只是要保證同一數據發給特定的目標socket,保證4元組唯一只是實現這一目標的手段而已,目的和手段是萬萬不能混淆的,意識到這一點之后,綁定相同端口相同IP地址的socket同時接收數據就是理所當然的了,只要保證來自同一IP地址和同一端口的數據總是發送給同一socket即可,而算法本身保證了這一點。
socket查找算法是重要的,查找算法沒有使用任何隨機因素比如隨機數之類的,而是使用固定的計算選擇一個目標socket,這足以保證只要是固定的源IP地址和源端口,算出的目標socket位置就是固定的,核心代碼如下:

sk_nulls_for_each_rcu(sk, node, &hslot->head) {
    score = compute_score(sk, net, saddr, hnum, sport,
                     daddr, dport, dif);
    if (score > badness) {
        result = sk;
        badness = score;
        reuseport = sk->sk_reuseport;
        if (reuseport) {
            hash = inet_ehashfn(net, daddr, hnum,
                        saddr, htons(sport));
            matches = 1;
        }
    } else if (score == badness && reuseport) {
        matches++;
        if (((u64)hash * matches) >> 32 == 0)
            result = sk;
        hash = next_pseudo_random32(hash);
    }
}

SO_REUSEPORT是一個其它UNIX平臺古已有之的選項,Linux終于也支持了,本文沒有討論其對TCP的影響,因為其他人討論的已經很多了,畢竟TCP的應用要多得多,TCP的進程綁定機制以及其本身的連接綁定機制也使得SO_REUSEPORT對TCP的影響很好理解,Listen狀態的多個綁定相同IP地址和相同端口的TCP socket很容易根據syn源IP和端口來為其綁定一個流并且記住它。但是SO_REUSEPORT對于UDP的影響,我沒有搜到什么太多的資料,加之我又特別需要UDP服務的負載均衡,因此我決定自己做一個。

需要C/C++ Linux服務器架構師學習資料私信“資料”(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享

淺談linux下基于UDP服務的負載均衡方法

 

3.實現一個多進程的UDP服務

3.1.只為OpenVPN

OpenVPN,一個充滿矛盾的存在,它沒有多處理,多多少少和UDP有關,它提倡用UDP,多多少少和TCP有關(是一定有關系的,別跟我講TCP的細節,什么按序,重傳,慢啟動,滑動窗口之類的,好好看看OpenVPN的man吧...),然而TCP的多處理,可謂汗牛充棟了。
如今,UDP也可以多處理了,好事兒!迫不及待想對它動個小手術,但是手術之前,還有幾件事情要確認。

3.2.多進程的UDP

有了SO_REUSEPORT支持之后,第一個反應以及第一個動作,那就是試一下效果。初始設計是極其簡單的,那就是創建多個UDP socket,綁定到相同的IP地址和相同的端口,然后poll,poll到誰有POLLIN事件了就recvfrom。此時不斷得使用不同的進程往該這些socket綁定的IP地址和端口發送數據,你會發現,和之前不支持SO_REUSEPORT的時候不一樣了,數據會發送到不同的UDP socket上,并且可以保證只要源地址和源端口是同一個,那么發送到的socket也是同一個,如果換一下源端口,那么就可能發送到另外的socket了。這個實驗的代碼及其簡單,代碼就不貼了。這個實驗證明了通過SO_REUSEPORT選項做UDP的負載均衡是可行的。
可是要充分利用多CPU性能,最重要的還是要使用多線程/多進程結構,特別是在處理UDP“長連接”的時候,說起UDP長連接,可能有人會覺得我把概念弄糊涂了,實際上我清楚得很,只不過沒有更好的詞也不想發明新的詞來形容使用UDP協議時的OpenVPN是怎么收發數據的。至于短連接且不太頻繁的,多進程意義倒不是很大,反而會引入進程創建,進程切換帶來的開銷。那么多進程自然是好的,如何實現呢?自然而然想到的就是預先建立多個進程了,每一個進程分配一個UDP socket來處理數據的收發,涉及到細節有很多種:
a.寫一個程序,創建一個使用SO_REUSEPORT綁定一個IP和端口的UDP socket,然后啟動多個實例,啟動腳本可以獨立于程序以外;
b.在主進程中預先fork多個進程,每一個進程只處理一個UDP socket;
c.在主進程中預先fork多個進程,每一個進程可以處理全部的UDP socket;
d.試圖建立UDP的accept模型

以上的方案中,第一個很明顯沒有什么意思,它也是我著手OpenVPN負載均衡的第一個方案,很傻瓜,很簡單,很實用,沒什么技巧可以炫耀,沒什么“技術含量”,以前的文章中說的多了,本文就不說了,只不過之前不能綁定相同端口(其實可以,但是綁了也沒有用)。另外的3個方案中,每一個都可以講一段故事。在講故事之前,首先給出方案b的代碼,因為它最純粹,也最成功,也是唯一可行的方案。代碼如下:

#include <sys/types.h>
#include <string.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/socket.h>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/uio.h>  

#define NUM_PROCESS     10
#define SO_REUSEPORT    15
#define SERV_PORT       12345
#define MAXSIZE         1024

struct worker_arg {
        int sock_fd;
        int process_num;
};

void DEBUG(char *argv) {
      //TODO  
}
void client_echo(int sock, int process_num)
{
        int n;
        char buff[MAXSIZE];
        struct sockaddr clientfrom;
        socklen_t addrlen = sizeof(clientfrom);
        for(;;) {
                memset(buff, 0, MAXSIZE);
                n = recvfrom(sock, buff, MAXSIZE, 0, &clientfrom, &addrlen);
                printf("%dn", process_num);
                n = sendto(sock, buff, n, 0, &clientfrom, addrlen);
        }
}

int create_udp_sock(const char *str_addr)
{
        int sockfd;
        int optval = 1;
        int fdval = 0;
        struct sockaddr_in servaddr, cliaddr;
        sockfd = socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, 0); /* create a socket */
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
       //這是關鍵,關鍵中的關鍵。如果沒有這個,此代碼在之前內核照樣完美運行!完美?完美!
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = inet_addr(str_addr);
        servaddr.sin_port = htons(SERV_PORT);
        if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
                perror("bind error");
                exit(1);
        }
                perror("bind ");
        
        return sockfd;
}

void woker_process(int sock, int num)
{        
        int woker_sock = sock;//recv_fd(sock);
        if(woker_sock < 0){
        }
        client_echo(woker_sock, num);
    
}

pid_t create_work(void *arg)
{
        struct worker_arg *warg = (struct worker_arg*)arg;
        pid_t pid = fork();
        if (pid > 0) {
                return pid;
        } else if (pid == 0){
                woker_process(warg->sock_fd, warg->process_num);
                // 不可能運行到此
        } else {
                exit(1);
        }
        // 不可能運行到此
        return pid;
}

void schedule_process(int socks[2], char *str_addr)
{
        pid_t pids[NUM_PROCESS] = {0};
        int udps[NUM_PROCESS] = {0};
        int i = 0;
        for (i = 0; i < NUM_PROCESS; i++) {
                udps[i] = create_udp_sock(str_addr);
        }
        for (i = 0; i < NUM_PROCESS; i++) {
                struct worker_arg warg;
                warg.sock_fd = udps[i];
                warg.process_num = i;
                pid_t pid = create_work(&warg);
                pids[i] = pid;
        }
        while (1) {
                int stat;
                int i;
                pid_t pid = waitpid(-1, &stat, 0);
                for (i = 0; i < NUM_PROCESS; i++) {
                        if (pid == pids[i]) {
                                // 最最關鍵的,那就是”把特定的套接字傳遞到特定的進程中“
                                struct worker_arg warg;
                                warg.sock_fd = udps[i];
                                warg.process_num = i;
                                pid_t pid = create_work(&warg);
                                pids[i] = pid;
                                break;
                        }
                }
        }
}

int main(int argc, char** argv)
{
        int unix_socks[2];
        char *str_addr = argv[1];
        if(socketpair(AF_UNIX, SOCK_STREAM, 0, unix_socks) == -1){
                exit(1);
        }
        schedule_process(unix_socks, str_addr);
        return 0;
}

最最重要的是上面的那個wait邏輯,你要保證,一個進程掛掉以后,要將其迅速拉起來,因為如果不這樣,所有命中(通過第四層的socket查找算法)該socket的數據都將得不到處理。

本來,我的代碼使用的是進程間傳遞文件描述符的方式,也就是經典著作里面講述的那種方式,但是我覺得它太UNIX了,就沒有用,于是使用了fork這種更加UNIX的方式!以上的代碼寫的不好,但是那是可以體現UDP多進程負載均衡的最好方式了,什么東西都在代碼本身!不過這代碼可真夠爛的...

4.問題1:中間有個socket死了會弄亂其它活著socket的位置

我們知道,SO_REUSEPORT選項設定了以后,會根據數據包的源地址和源端口來計算一個值,該值對應所有相同目標IP地址相同目標端口的socket中的一個socket的位置,內核便把這個位置的socket作為接收數據的socket,然而,如果這個socket死掉了的話,即使重新創建了一個新的socket湊足了數,也會使整個socket鏈表重新排序,除非死掉的是最后一個。由于socket查找算法是不變的,計算位置的數據也不變,諸多socket的順序改變意味著數據源和目標處理進程的對應關系會發生改變,從而本應該處理該數據的socket沒有收到數據,反而被別的socket接收,這也是上述多進程方案中a方案的唯一弊端,你必須在外部監控哪個進程掛掉了,進程掛掉意味著socket被關閉,你又沒有辦法(沒有這樣的API)再創建一個socket插入到掛掉進程使用的那個socket所在的位置,唯一的方法就是全部殺掉,重新開始,但是這顯然不是最好的解決方案,最好的方案...
解決:保證UDP不死,即它不被關閉
怎么保證呢?很簡單,那就是socket的創建和關閉全部由主進程來統一管理,工作子進程們只處理網絡IO,不關閉socket,這就要求socket在創建的時候要帶有CLOEXEC標志。這也是方案b和方案c被提出的原因(同時也是結果)。

5.UDP失敗的accept模型-按需建立UDP處理進程

多么想為UDP建立一個accept模型,有了SO_REUSEPORT選項以后,貌似有了一點希望,那就著手寫代碼了。代碼是寫出來了,也能用,看樣子還不錯,可是總是有解決不了的小尾巴,最終由于UDP和4元組沒有必然的對應關系(這也是UDP的本質,否則它就成了有連接協議了,詳見問題3)推論出UDP的accept幾乎是沒有希望的,之所以還可以用SO_REUSEPORT選項來做UDP的多進程負載均衡,其實完全依賴了Linux內核處理UDP socket查找時的一個算法,該算法只是一種實現而已,并不能保證其它的系統或者未來的Linux內核不會改變算法的行為,因此這種UDP的多進程負載均衡并非標準方案,它的使用有賴于你對系統內核行為的熟悉以及特定版本特定算法的副作用的理解。

不管怎么樣,還是給出一個很XX的實現吧,畢竟目標不是為了展示“瞧啊,我實現了一個UDP的accept”,這無異于對別人說,我有更麻煩的解法。醉翁之意不在酒,在于OpenVPN?NO,是為了引出一個排他喚醒的問題。看代碼:

#define _GNU_SOURCE
#include <sys/types.h>
#include <string.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/socket.h>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/uio.h>  
#include <poll.h>

#include <pthread.h>

#include "list.h"

#define NUM_PROCESS     10
#define SO_REUSEPORT    15
#define SERV_PORT       12345
#define MAXSIZE         1024

struct client_socket {
        struct list_head list;
        pid_t pid;
        int sock_fd;
        int process_num;
        struct sockaddr *src_addr;
        char init_data[MAXSIZE]; 
};


void DEBUG(char *argv) {
        
}

void client_echo(int sock, int process_num)
{
        int n;
        char buff[MAXSIZE];
        struct sockaddr clientfrom;
        socklen_t addrlen = sizeof(clientfrom);

                struct pollfd pfds[1];
                int pfd[1] = {-1};
                pfds[0].fd = sock;
                pfds[0].events = POLLIN|POLLOUT;
                while (1) {
                        int efds = -1;
                //以下注釋掉的代碼如果不放開,就會有問題!具體還請看我下面的關于”排他喚醒“的分析
                //      if ((efds = poll(pfds,1,-1)) < 0) {
                //              return -1;
                //      }
                //      if(pfds[0].revents & POLLIN) {
                                memset(buff, 0, MAXSIZE);
                                n = recvfrom(sock, buff, MAXSIZE, 0, &clientfrom, &addrlen);
                                printf("recv data:%s   size:%d   num:%d  pid:%dn", buff, n, process_num, getpid());
                                n = sendto(sock, buff, n, 0, &clientfrom, addrlen);
                //      }
                }
}

int create_udp_socks(const char *str_addr)
{
        int sockfd;
        int optval = 1;
        int fdval = 0;
        struct sockaddr_in servaddr, cliaddr;
        sockfd = socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, 0); /* create a socket */
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = inet_addr(str_addr);
        servaddr.sin_port = htons(SERV_PORT);
        if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
                perror("bind error");
                exit(1);
        }
        
        return sockfd;
}

void woker_process(int sock, int num, char *init_data, pid_t pid)
{        
        int woker_sock = sock;
        client_echo(woker_sock, num);
    
}

pid_t create_worker(void *arg)
{
        struct client_socket *pcs = (struct client_socket*)arg;
        pid_t pid = fork();
        if (pid > 0) {
                pcs->pid = pid;
                return pid;
        } else if (pid == 0){
                woker_process(pcs->sock_fd, pcs->process_num, pcs->init_data, pcs->pid);
                // 不可能運行到此
        } else {
                exit(1);
        }
        // 不可能運行到此
        return pid;
}

struct client_socket* udp_accept(struct pollfd *pfds)
{
        int ret_fd = -1;
        int efds = -1;
        static LIST_HEAD(worker_list);
        int i;
        struct client_socket *pcs = NULL;
        if ((efds = poll(pfds, NUM_PROCESS, -1)) < 0) {
                printf("poll errorn");
                return NULL;
        }
        for (i = 0; i < NUM_PROCESS; i++) {
                // 此處的循環只會處理沒有hash到既有worker的UDP套接字
                if(pfds[i].revents & POLLIN) {
                        struct sockaddr_in *pclientfrom = (struct sockaddr_in *)calloc(1, sizeof(struct sockaddr_in));
                        socklen_t addrlen = sizeof(struct sockaddr_in);
                        char pesud_buf[MAXSIZE];
                        int n = 0;
                        struct list_head *pos;
                        struct client_socket *ocs;
                        n = recvfrom(pfds[i].fd, pesud_buf, MAXSIZE, MSG_PEEK, pclientfrom, &addrlen);
                        printf("accept n:%d   port:%dn", n, ntohs(pclientfrom->sin_port));
                        int flag = 0;
                        // 此處的循環為了防止為同一個“連接”(即相同的五元組)創建兩個或者多個worker進程
                        list_for_each(pos, &worker_list) {
                                ocs = list_entry(pos, struct client_socket, list);
                                if (!memcmp(pclientfrom, ocs->src_addr, sizeof(struct sockaddr_in))) {
                                        flag = 1;
                                        break;
                                }
                        }
                        // 如果該“連接”已經被hash到一個既有的worker,忽略它,該worker自會處理!
                        if (flag) {
                                free (pclientfrom);
                                continue;
                        }
                        ret_fd = pfds[i].fd;
                        pcs = (struct client_socket*)calloc(1, sizeof(struct client_socket));
                        pcs->sock_fd = ret_fd;
                        pcs->process_num = i;
                        pcs->src_addr = pclientfrom;
                        INIT_LIST_HEAD(&pcs->list);
                        list_add(&pcs->list, &worker_list);
                        break;
                }
        }
        return pcs;
}

void schedule_process(int socks[2], char *str_addr, struct pollfd *pfds)
{
        pid_t pids[NUM_PROCESS] = {0};
        int udps[NUM_PROCESS] = {0};
        int i = 0;
        for (i = 0; i < NUM_PROCESS; i++) {
                udps[i] = create_udp_socks(str_addr);
                pfds[i].fd = udps[i];
                pfds[i].events = POLLIN;
        }
        while (1) {
                pid_t pid;
                struct client_sock *pcs = udp_accept(&pfds[0]);
                printf("accept :%pn", pcs);
                if (pcs != NULL) {
                        pid = create_worker(pcs);
                }
        }
}

void *wait_thread(void *arg)
{
        struct pollfd *pfds = (struct pollfd*)arg;
        while (1) {
                int stat;
                int i;
                pid_t pid = waitpid(-1, &stat, 0);
                // 一直以來,我總覺得應該wait一點什么,但是...
        }       
}

int main(int argc, char** argv)
{
        int unix_socks[2];
        char *str_addr = argv[1];
        struct pollfd pfds[NUM_PROCESS] = {0};
        pthread_t tid;
        int ret = -1;
        if(socketpair(AF_UNIX, SOCK_STREAM, 0, unix_socks) == -1){
                exit(1);
        }
        ret = pthread_create(&tid, NULL, wait_thread, &pfds[0]);
        if (ret) {
               exit(1);
        }
        schedule_process(unix_socks, str_addr, &pfds[0]);
        return 0;
}

以上的代碼看似體現了UDP的accept邏輯,但是卻不是!它真實地體現了UDP是不可能實現accept的,除非你使用類似conntrack的附加因素!我們還是尊重UDP的原本的意義吧,如果你非要想為UDP添加一個第四層的conntrack邏輯,何必不直接使用TCP呢?也許你會回答,TCP太低效,太復雜,哦,是的,你不喜歡TCP,是的,我也不喜歡,但是你可以實現一個用戶態的邏輯而不要糾結于第四層,畢竟UDP本身就是無連接的。見過有誰為IP建立過連接機制嗎?是的,太多了,ip_conntrack,MPLS,有狀態Firewall...不一而足,,,這到底是協議設計之初的缺陷呢,還是人們得寸進尺的證明?如果你想做點什么,以上都是例子,另外,還有更好的,那就是DTLS和OpenVPN!

6.問題2:兩個進程帶PEEK以及不帶PEEK同時recv一個socket-排他喚醒問題

說實話,之所以寫下這個失敗的UDP accept模型并不是為了想講UDP怎么怎么地,而是想說一下Linux的排他喚醒問題,甚至更往前的,也就是UDP多進程處理的方案c的提出,即所有進程處理所有的socket,也是想說一下這個排他喚醒的問題。真正和UDP相關的問題,是本小節的子問題。
所謂的排他喚醒機制,簡單的說就是:如果多個進程都睡眠在等待一個事件上,當該事件發生的時候,只能喚醒其中一個。socket在阻塞recv的時候,如果沒有數據,就會睡眠等待,此時如果有數據來,會喚醒socket關聯的進程,該喚醒就是排他喚醒,也就是說,如果有兩個進程都在等待同一個socket上進行阻塞recv,有數據來的時候,只有一個進程會得到數據并返回,另一個進程繼續睡眠等待。具體可以參見內核函數__wake_up_common:

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

知道了這一點后,我們來分析一下UDP accept模型中的一段代碼,在主進程中:

while (1) {
    poll
    if (POLLIN) {
        recvfrom(with PEEK)
    }
}

在子進程中:

while (1) {
    recvfrom(...)
}

帶有PEEK標志的recvfrom是不會把數據從接收緩沖去移走的,我們假設數據到來,會有喚醒動作,如果主進程poll時排在子進程recvfrom之前睡眠,那么會先喚醒主進程,poll返回,由于poll被喚醒不是排他的,所以子進程接著也會被喚醒,而此時主進程會去帶有PEEK的讀數據,但是子進程趕在前面把數據讀走了,recvfrom返回,此時主進程的recvfrom撲了個空,進而主進程的recvfrom阻塞,等待數據,此時子進程又一次調用recvfrom,到此為止,兩個進程都阻塞在recvfrom上睡眠,且主進程排在前面(它先睡眠),此時又來了一個數據,主進程被排他喚醒,子進程繼續睡眠等待,主進程的recvfrom返回,由于帶有PEEK標志,數據繼續留在接收隊列里面,下一輪的poll直接返回,進而繼續調用PEEK版的recvfrom也會直接返回,如果再也沒有新的數據或者信號到來的話,子進程將不會被喚醒,它將永遠也得不到那個一直被主進程PEEK的數據...這就要求,子進程必須要用poll+recvfrom,因為poll保證真正有數據的時候才會返回,讓進程繼而去recvfrom,而主進程只是PEEK數據,所以就不會出現上述死循環,另外,本人覺得,帶有PEEK標志的recv系列函數在被喚醒的時候,不應該是排他喚醒的!
是的,是的, PEEK喚醒不應該是排他的,因為之所以recv是排他的,是為了避免“承諾給一個進程”的數據被其他進程取走了,這里的關鍵詞是“取走了”,然而PEEK版的recv是不可能取走任何東西的!但是,不管怎樣,我不想改內核代碼,何況我又不會真的去用這個UDP accept,因此也就用不到PEEK,除了PEEK方式的recv,其他真正的recv是有必要是排他的,否則到底誰執行數據處理呢?鑒于此,UDP多處理方案c被徹底否決。

6.1.子問題:UDP socket和4元組以及處理進程的對應關系

UDP無連接,它無法“記住“對端,因此也就不知道一個數據包的前置和后續,也就是說UDP不知道它的前置是否曾經來過,也不曉得它的后繼還會不會來以及如果還會來的話,還會來多久,因此就無法為其預先分配資源,而數據處理資源的容器是什么?是進程(愛較真兒的又要說還有線程且線程更好了...)或者線程。無法預先分配資源是多處理的一大障礙!對于TCP而言,這很簡單,一個syn包到來,理論上就可以為其分配資源了,典型得就是創建一個進程,最后一個fin或者reset之類的到來就知道該結束進程了,只是說系統沒有這么實現罷了,而是把滿三次握手成功后最為資源分配點,最先分配的是一個socket,也就是accept返回的那個,然后使其實際情況,是要fork呢,要繼續select/poll呢,還是要event poll呢,由調用者決定。對于UDP,很難,一個UDP包的到來,它可能就是唯一的一個,比如DNS查詢,也可能是一次會話的開始,比如DTLS或者OpenVPN連接,但是僅憑第四層,你很難區分。你幾乎不可能為單獨的UDP 4元組分配一個單獨的進程,幾乎不可能。
那么怎么辦?只能當靶子守株待兔了,事先創建多個進程,一個進程處理一個socket,然后靜等,等待有數據包來命中,至于撞到了哪一個socket,由第四層的socket查找算法決定!

7.問題3:連接關閉的問題以及何時結束進程的問題-UDP多路負載均衡方案并不通用

本文的討論,最終選擇了方案b,但是還有一個問題,那就是你無法在外面得知進程什么時候該退出了的消息,因為UDP不像TCP那樣有一系列的諸如FIN,RST之類的標志表示連接斷開,即使像OpenVPN那樣的”有連接的UDP“,也無法在協議層面上得知,它的有連接是第七層的連接,也許你不知道,我可以告訴你,OpenVPN有一個叫做exit-notify的參數,可以代表連接的斷開...即使像底層的ip_conntrack那樣,也無法更好地去對待UDP的”連接狀態“,雖然它真的追蹤了UDP的4元組信息,儼然它已經成了有連接的協議(這難道不是它的主要工作嗎?否則愧對于自己的名字啊)。除了超時這種機制之外,沒有更好的了,畢竟當所有的裁決手段都失效的時候,一切都要靠時間來沖淡。然而即使是時間,也對第七層的邏輯無能為力,在這個意義上,你要么狠一點,它可能很不準,要么準一點,但可能很不狠,狠和準,看你怎么選,對于OpenVPN,我不選,...我在等待官方新版本...
UDP無法對應一個4元組,但是由于Linux內核在第四層查找socket有明確的算法可以保證相同的4元組始終對應一個socket,那么就可以保證相同的4元組總是對應一個進程,只要保證該進程始終處理該socket即可!但是這并不是絕對的,因為按照UDP協議的原始含義,它就是數據報協議,協議假設任意兩個數據報都是沒有任何關聯的,因此協議棧第四層的socket查找邏輯不應該總是保證相同的4元組對應一個socket!然而UDP除了保持其數據報的語義外,還是一種真真切切的傳輸層協議,是一種IP協議的端到端映射,這就意味著你可以在其上面建立任何的”流“,雖然這不是協議棧的職責,但是Linux的socket查找算法幫我們做到了最重要的事,如果在__udp4_lib_lookup調用的inet_ehashfn中:

static inline unsigned int inet_ehashfn(struct net *net,
                    const __be32 laddr, const __u16 lport,
                    const __be32 faddr, const __be16 fport)
{
    return jhash_3words((__force __u32) laddr,
                (__force __u32) faddr,
                ((__u32) lport) << 16 | (__force __u32)fport,
                inet_ehash_secret + net_hash_mix(net));
}

稍微改變了一下方略,引入了一個random,那么以上的所有負載均衡方案全部報廢!

分享到:
標簽:負載均衡
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定