在做網絡服務的時候tcp并發服務端程序的編寫必不可少。tcp并發通常有幾種固定的設計模式套路,他們各有優點,也各有應用之處。下面就簡單的討論下這幾種模式的差異:
單進程,單線程
在accept之后,就開始在這一個連接連接上的數據收接收,收到之后處理,發送,不再接收新的連接,除非這個連接的處理結束。
優點: 簡單。
缺點: 因為只給一個客戶端服務,所以不存在并發的可能。
應用: 用在只給一個客戶端服務的時候。
多進程
accept返回成功時候,就給這一個連接fork一個進程,專門處理這個連接上的數據收發,等這個連接處理結束之后就結束這個進程。
優點: 編程相對簡單,不用考慮線程間的數據同步等。
缺點: 資源消耗大。啟動一個進程消耗相對比啟動一個線程要消耗大很多,同時在處理很多的連接時候需要啟動很多的進程多去處理,這時候對系統來說壓力就會比較大。另外系統的進程數限制也需要考慮。
應用: 在客戶端數據不多的時候使用很方便,比如小于10個客戶端。
多線程
類似多進程方式,但是針對一個連接啟動一個線程。
優點: 相對多進程方式,會節約一些資源,會更加高效一些。
缺點: 相對多進程方式,增加了編程的復雜度,因為需要考慮數據同步和鎖保護。另外一個進程中不能啟動太多的線程。在linux系統下線程在系統內部其實就是進程,線程調度按照進程調度的方式去執行的。
應用: 類似于多進程方式,適用于少量的客戶端的時候。
Select+多線程
有一個線程專門用于監聽端口,accept返回之后就把這個描述符放入 描述符集合 fd中,一個線程用select去輪訓描述符集合,在有數據的連接上接收數據,另外一個線程專門發送數據。當然也可以接收和發送用一個線程。描述符可以設置成非阻塞模式,也可以設置成阻塞模式。通常連接設置成非阻塞模式,發送線程獨立出來。
優點:相對前幾種模式,這種模式大大提高了并發量。
缺點:系統一般實現描述符集合是采用一個大數組,每次調用select的時候都會輪詢這個描述符數組,當連接數很多的時候就會導致效率下降。連接數在1000以上時候效率會下降到不能接受。
應用:目前windows 和一般的Unix上的tcp并發都采用select方式,應該說應用還是很廣泛的。
epoll方式
在Linux2.6版本之后,增加了epoll。具體的使用是:一個線程專門進行端口監聽,accept接收到連接的時候,把該連接設置成非阻塞模式,把 epoll事件設置成邊緣觸發方式,加入到epoll管理。接收線程阻塞在epoll的等待事件函數。另外一個線程專門用于數據發送。
優點:由于epoll的實現方式先進,所以這種方式可以大規模的實現并發。我們現在的應用在一個3年前的dell的pc server可以實現2萬個連接的并發,性能也是很好的。
缺點:由于涉及了線程和非阻塞,所以會導致編碼的復雜度增大一些。這種方式只適用于Linux 2.6內核以后。
注意:
1) 如果把epoll事件設置成水平觸發效率就下降到類似采用select的水平。
2) Unix系統下有單個進程打開的描述符數目限制,還有系統內打開的描述符數目限制。系統內打開的描述符數目限制由軟硬限制兩個。硬限制是根據機器的配置而不同。軟限制可以更改,但是必須小于系統的硬限制。在suse Linux下,可以在root用戶下,通過ulimit -n 數目去修改這個限制。
應用: Linux下大規模的tcp并發。
配置開發支持高并發TCP連接的Linux應用程序全攻略
修改用戶進程可打開文件數限制
在Linux平臺上,無論編寫客戶端程序還是服務端程序,在進行高并發TCP連接處理時,最高的并發數量都要受到系統對用戶單一進程同時可打開文件數量的限制(這是因為系統為每個TCP連接都要創建一個socket句柄,每個socket句柄同時也是一個文件句柄)。可使用ulimit命令查看系統允許當前用戶進程打開的文件數限制:
[speng@as4 ~]$ ulimit -n
1024
這表示當前用戶的每個進程最多允許同時打開1024個文件,這1024個文件中還得除去每個進程必然打開的標準輸入,標準輸出,標準錯誤,服務器監聽 socket,進程間通訊的unix域socket等文件,那么剩下的可用于客戶端socket連接的文件數就只有大概1024-10=1014個左右。也就是說缺省情況下,基于Linux的通訊程序最多允許同時1014個TCP并發連接。
對于想支持更高數量的TCP并發連接的通訊處理程序,就必須修改Linux對當前用戶的進程同時打開的文件數量的軟限制(soft limit)和硬限制(hardlimit)。其中軟限制是指Linux在當前系統能夠承受的范圍內進一步限制用戶同時打開的文件數;硬限制則是根據系統硬件資源狀況(主要是系統內存)計算出來的系統最多可同時打開的文件數量。通常軟限制小于或等于硬限制。
修改上述限制的最簡單的辦法就是使用ulimit命令:
[speng@as4 ~]$ ulimit -n <file_num>
上述命令中,在<file_num>中指定要設置的單一進程允許打開的最大文件數。如果系統回顯類似于“Operation notpermitted”之類的話,說明上述限制修改失敗,實際上是因為在<file_num>中指定的數值超過了Linux系統對該用戶打開文件數的軟限制或硬限制。因此,就需要修改Linux系統對用戶的關于打開文件數的軟限制和硬限制。
第一步,修改/etc/security/limits.conf文件,在文件中添加如下行:
speng soft nofile 10240
speng hard nofile 10240
其中speng指定了要修改哪個用戶的打開文件數限制,可用'*'號表示修改所有用戶的限制;soft或hard指定要修改軟限制還是硬限制;10240則指定了想要修改的新的限制值,即最大打開文件數(請注意軟限制值要小于或等于硬限制)。修改完后保存文件。
第二步,修改/etc/pam.d/login文件,在文件中添加如下行:
session required /lib/security/pam_limits.so
這是告訴Linux在用戶完成系統登錄后,應該調用pam_limits.so模塊來設置系統對該用戶可使用的各種資源數量的最大限制(包括用戶可打開的最大文件數限制),而pam_limits.so模塊就會從/etc/security/limits.conf文件中讀取配置來設置這些限制值。修改完后保存此文件。
第三步,查看Linux系統能的最大打開文件數限制,使用如下命令:
[speng@as4 ~]$ cat /proc/sys/fs/file-max
12158
這表明這臺Linux系統最多允許同時打開(即包含所有用戶打開文件數總和)12158個文件,是Linux系統能硬限制,所有用戶級的打開文件數限制都不應超過這個數值。通常這個系統級硬限制是Linux系統在啟動時根據系統硬件資源狀況計算出來的最佳的最大同時打開文件數限制,如果沒有特殊需要,不應該修改此限制,除非想為用戶能打開文件數限制設置超過此限制的值。修改此硬限制的方法是修改/etc/rc.local腳本,在腳本中添加如下行:
echo 22158 > /proc/sys/fs/file-max
這是讓Linux在啟動完成后強行將系統級打開文件數硬限制設置為22158。修改完后保存此文件。
完成上述步驟后重啟系統,一般情況下就可以將Linux系統對指定用戶的單一進程允許同時打開的最大文件數限制設為指定的數值。如果重啟后用 ulimit-n命令查看用戶可打開文件數限制仍然低于上述步驟中設置的最大值,這可能是因為在用戶登錄腳本/etc/profile中使用ulimit -n命令已經將用戶可同時打開的文件數做了限制。由于通過ulimit-n修改系統對用戶可同時打開文件的最大數限制時,新修改的值只能小于或等于上次 ulimit-n設置的值,因此想用此命令增大這個限制值是不可能的。所以,如果有上述問題存在,就只能去打開/etc/profile腳本文件,在文件中查找是否使用了ulimit-n限制了用戶可同時打開的最大文件數量,如果找到,則刪除這行命令,或者將其設置的值改為合適的值,然后保存文件,用戶退出并重新登錄系統即可。
通過上述步驟,就為支持高并發TCP連接處理的通訊處理程序解除關于打開文件數量方面的系統限制。
修改網絡內核對TCP連接的有關限制
在Linux上編寫支持高并發TCP連接的客戶端通訊處理程序時,有時會發現盡管已經解除了系統對用戶同時打開文件數的限制,但仍會出現并發TCP連接數增加到一定數量時,再也無法成功建立新的TCP連接的現象。出現這種現象的原因有多種。
第一種原因可能是因為Linux網絡內核對本地端口號范圍有限制。此時,進一步分析為什么無法建立TCP連接,會發現問題出在connect()調用返回失敗,查看系統錯誤提示消息是“Can't assign requestedaddress”。同時,如果在此時用tcpdump工具監視網絡,會發現根本沒有TCP連接時客戶端發SYN包的網絡流量。這些情況說明問題在于本地Linux系統內核中有限制。其實,問題的根本原因在于Linux內核的TCP/IP協議實現模塊對系統中所有的客戶端TCP連接對應的本地端口號的范圍進行了限制(例如,內核限制本地端口號的范圍為1024~32768之間)。當系統中某一時刻同時存在太多的TCP客戶端連接時,由于每個TCP客戶端連接都要占用一個唯一的本地端口號(此端口號在系統的本地端口號范圍限制中),如果現有的TCP客戶端連接已將所有的本地端口號占滿,則此時就無法為新的TCP客戶端連接分配一個本地端口號了,因此系統會在這種情況下在connect()調用中返回失敗,并將錯誤提示消息設為“Can't assignrequested address”。有關這些控制邏輯可以查看Linux內核源代碼,以linux2.6內核為例,可以查看tcp_ipv4.c文件中如下函數:
static int tcp_v4_hash_connect(struct sock *sk)
請注意上述函數中對變量sysctl_local_port_range的訪問控制。變量sysctl_local_port_range的初始化則是在tcp.c文件中的如下函數中設置:
void __init tcp_init(void)
內核編譯時默認設置的本地端口號范圍可能太小,因此需要修改此本地端口范圍限制。
第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:
net.ipv4.ip_local_port_range = 1024 65000
這表明將系統對本地端口范圍限制設置為1024~65000之間。請注意,本地端口范圍的最小值必須大于或等于1024;而端口范圍的最大值則應小于或等于65535。修改完后保存此文件。
第二步,執行sysctl命令:
[speng@as4 ~]$ sysctl -p
如果系統沒有錯誤提示,就表明新的本地端口范圍設置成功。如果按上述端口范圍進行設置,則理論上單獨一個進程最多可以同時建立60000多個TCP客戶端連接。
第二種無法建立TCP連接的原因可能是因為Linux網絡內核的IP_TABLE防火墻對最大跟蹤的TCP連接數有限制。此時程序會表現為在 connect()調用中阻塞,如同死機,如果用tcpdump工具監視網絡,也會發現根本沒有TCP連接時客戶端發SYN包的網絡流量。由于 IP_TABLE防火墻在內核中會對每個TCP連接的狀態進行跟蹤,跟蹤信息將會放在位于內核內存中的conntrackdatabase中,這個數據庫的大小有限,當系統中存在過多的TCP連接時,數據庫容量不足,IP_TABLE無法為新的TCP連接建立跟蹤信息,于是表現為在connect()調用中阻塞。此時就必須修改內核對最大跟蹤的TCP連接數的限制,方法同修改內核對本地端口號范圍的限制是類似的:
第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:
net.ipv4.ip_conntrack_max = 10240
這表明將系統對最大跟蹤的TCP連接數限制設置為10240。請注意,此限制值要盡量小,以節省對內核內存的占用。
第二步,執行sysctl命令:
[speng@as4 ~]$ sysctl -p
如果系統沒有錯誤提示,就表明系統對新的最大跟蹤的TCP連接數限制修改成功。如果按上述參數進行設置,則理論上單獨一個進程最多可以同時建立10000多個TCP客戶端連接。
使用支持高并發網絡I/O的編程技術
在Linux上編寫高并發TCP連接應用程序時,必須使用合適的網絡I/O技術和I/O事件分派機制。
可用的I/O技術有同步I/O,非阻塞式同步I/O(也稱反應式I/O),以及異步I/O。在高TCP并發的情形下,如果使用同步I/O,這會嚴重阻塞程序的運轉,除非為每個TCP連接的I/O創建一個線程。但是,過多的線程又會因系統對線程的調度造成巨大開銷。因此,在高TCP并發的情形下使用同步 I/O是不可取的,這時可以考慮使用非阻塞式同步I/O或異步I/O。非阻塞式同步I/O的技術包括使用select(),poll(),epoll等機制。異步I/O的技術就是使用AIO。
從I/O事件分派機制來看,使用select()是不合適的,因為它所支持的并發連接數有限(通常在1024個以內)。如果考慮性能,poll()也是不合適的,盡管它可以支持的較高的TCP并發數,但是由于其采用“輪詢”機制,當并發數較高時,其運行效率相當低,并可能存在I/O事件分派不均,導致部分 TCP連接上的I/O出現“饑餓”現象。而如果使用epoll或AIO,則沒有上述問題(早期Linux內核的AIO技術實現是通過在內核中為每個 I/O請求創建一個線程來實現的,這種實現機制在高并發TCP連接的情形下使用其實也有嚴重的性能問題。但在最新的Linux內核中,AIO的實現已經得到改進)。
綜上所述,在開發支持高并發TCP連接的Linux應用程序時,應盡量使用epoll或AIO技術來實現并發的TCP連接上的I/O控制,這將為提升程序對高并發TCP連接的支持提供有效的I/O保證。
網絡編程中并發服務器的設計模式
并發服務器有三種設計模式
1) 多進程。每個進程服務一個客戶端。優勢是有各自獨立的地址空間,可靠性高,但進程調度開銷大,無法資源共享,進程間通信機制復雜。
2) 多線程。每個線程服務一個客戶端。優勢是開銷小,通信機制簡單,可共享內存。但共享地址空間,可靠性低,一個服務器出現問題時可能導致系統崩潰,同時全局共享可能帶來競爭,共享資源需要互斥,對編程要求高。
3) 單進程:占有的進程及線程資源少,通信機制簡單。但監聽服務器及各個子服務器揉和在一起,程序結構復雜不清晰,編程麻煩。
例子:多進程并發服務器
/* 多進程并發服務器。該程序等候客戶連接,一旦連接則顯示客戶的地址,
接著接收該客戶的名字并顯示。然后接收來自該客戶的信息(字符串)。
每當收到一個字符串,則顯示該字符串,并將字符串反轉,再將反轉的字
符發回客戶。之后,繼續等待接收該客戶的信息直至該客戶關閉連接。服
務器具有同時處理多客戶的能力。
*/
#include <stdio.h> /* These are the usual header files */
#include <strings.h> /* for bzero() */
#include <unistd.h> /* for close() */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define PORT 1234 /* Port that will be opened */
#define BACKLOG 2 /* Number of allowed connections */
#define MAXDATASIZE 1000
void process_cli(int connectfd, struct sockaddr_in client);
main()
{
int listenfd, connectfd; /* socket descriptors */
pid_t pid;
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
int sin_size;
/* Create TCP socket */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() errorn");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while(1)
{
/*accept connection.what causes the acceptance? */
if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {
perror("accept() errorn");
exit(1);
}
/* Create child process to service client */
if ((pid=fork())>0) {
/* parent process */
close(connectfd);
continue;
}
else if (pid==0) {
/*child process*/
close(listenfd);
process_cli(connectfd, client);
exit(0);
}
else {
printf("fork errorn");
exit(0);
}
}
close(listenfd); /* close listenfd */
}
void process_cli(int connectfd, struct sockaddr_in client)
{
int num;
char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];
printf("You got a connection from %s. ",inet_ntoa(client.sin_addr) ); /* prints client's IP */
/* Get client's name from client */
num = recv(connectfd, cli_name, MAXDATASIZE,0);
if (num == 0) {
close(connectfd);
printf("Client disconnected.n");
return;
}
cli_name[num - 1] = '';
printf("Client's name is %s.n",cli_name);
while (num = recv(connectfd, recvbuf, MAXDATASIZE,0))
{
int i = 0;
recvbuf[num] = '';
printf("Received client( %s ) message: %s",cli_name, recvbuf);
for (i = 0; i < num - 1; i++) {
sendbuf[i] = recvbuf[num - i -2];
}
sendbuf[num - 1] = '';
send(connectfd,sendbuf,strlen(sendbuf),0); /* send to the client welcome message */
}
close(connectfd); /* close connectfd */
}
例子:多線程并發服務器
#include <stdio.h> /* These are the usual header files */
#include <string.h> /* for bzero() */
#include <unistd.h> /* for close() */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <stdlib.h> /* for exit in c++(.C/.cc) ; no need for c ??*/
#define PORT 1234 /* Port that will be opened */
#define BACKLOG 5 /* Number of allowed connections */
#define MAXDATASIZE 1000
//void process_cli(int connectfd, sockaddr_in client); // c only supports struct sockaddr_in, but c++ support sockaddr_in
void process_cli(int connectfd, struct sockaddr_in client);
/* function to be executed by the new thread */
void* start_routine(void* arg);
typedef struct _ARG {
int connfd;
struct sockaddr_in client;
}ARG;
// it's better to use typedef struct
main()
{
int listenfd, connectfd; /* socket descriptors */
pthread_t thread;
//struct ARG *arg; // when no typedef,there should be struct for c code; no need for c++
ARG *arg;
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
int sin_size;
/* Create TCP socket */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() errorn");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while(1)
{
/* Accept connection */
// if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {// no problem for c
if ((connectfd = accept(listenfd,(struct sockaddr *)&client,(socklen_t *)&sin_size))==-1) {
perror("accept() errorn");
exit(1);
}
/* Create thread*/
arg = new ARG;
arg->connfd = connectfd;
//memcpy((void *)&arg->client, &client, sizeof(client)); // both ok!
memcpy(&arg->client, &client, sizeof(client));
if (pthread_create(&thread, NULL, start_routine, (void*)arg)) {
/* handle exception */
perror("Pthread_create() error");
exit(1);
}
}
close(listenfd); /* close listenfd */
}
void process_cli(int connectfd, sockaddr_in client)
{
int num;
char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];
printf("You got a connection from %s. ",inet_ntoa(client.sin_addr) );
/* Get client's name from client */
num = recv(connectfd, cli_name, MAXDATASIZE,0);
if (num == 0) {
close(connectfd);
printf("Client disconnected.n");
return;
}
cli_name[num - 1] = '';
printf("Client's name is %s.n",cli_name);
while (num = recv(connectfd, recvbuf, MAXDATASIZE,0)) {
recvbuf[num] = '';
printf("Received client( %s ) message: %s",cli_name, recvbuf);
for (int i = 0; i < num - 1; i++) {
sendbuf[i] = recvbuf[num - i -2];
}
sendbuf[num - 1] = '';
send(connectfd,sendbuf,strlen(sendbuf),0);
}
close(connectfd); /* close connectfd */
}
void* start_routine(void* arg)
{
ARG *info;
info = (ARG *)arg;
/* handle client's requirement */
process_cli(info->connfd, info->client);
////delete arg will cause warning!the type for deleting should be the same as new allocated
delete info;
pthread_exit(NULL);
}
例子:單線程并發服務器
#include <stdio.h> /* These are the usual header files */
#include <string.h> /* for bzero() */
#include <unistd.h> /* for close() */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <stdlib.h>
#define PORT 6888 /* Port that will be opened */
#define BACKLOG 5 /* Number of allowed connections simutaniously*/
#define MAXDATASIZE 1000
typedef struct _CLIENT{
int fd;
struct sockaddr_in addr; /* client's address information */
char data[1024];
}CLIENT;
void process_cli(CLIENT *client, char* recvbuf, int len);
void savedata(char* recvbuf, int len, char* data);
main()
{
int i, maxi, maxfd, sockfd;
int nready;
size_t n;
fd_set rset, allset;
int listenfd, connectfd; /* socket descriptors */
struct sockaddr_in server; /* server's address information */
/* client's information */
CLIENT client[FD_SETSIZE];
char recvbuf[MAXDATASIZE];
int sin_size;
/* Create TCP socket */
if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if( bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1 ){
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() errorn");
exit(1);
}
sin_size = sizeof(struct sockaddr_in);
/*initialize for select */
maxfd = listenfd;
maxi = -1;
for( i = 0; i<FD_SETSIZE; i++ ){
client[i].fd = -1;
}
FD_ZERO( &allset );
FD_SET( listenfd, &allset );
while(1){
struct sockaddr_in addr;
rset = allset;
nready = select( maxfd+1, &rset, NULL, NULL, NULL );
printf("select saw rset actions and the readfset num is %d. n",nready );
if( FD_ISSET(listenfd, &rset) ){
/* new client connection */
/* Accept connection */
printf("accept a connection.n");
if(( connectfd = accept(listenfd,(struct sockaddr *)&addr,(socklen_t *)&sin_size))==-1 ){
perror("accept() errorn");
continue;
}
/* Put new fd to client */
for( i = 0; i < FD_SETSIZE; i++ ){
if (client[i].fd < 0) {
client[i].fd = connectfd; /* save descriptor */
client[i].addr = addr;
client[i].data[0] = '';
printf("You got a connection from %s. ",inet_ntoa(client[i].addr.sin_addr) );
break;
}
}
printf("add new connect fd.n");
if(i == FD_SETSIZE ) printf("too many clientsn");
FD_SET( connectfd, &allset ); /* add new descriptor to set */
if( connectfd > maxfd ) maxfd = connectfd;
if( i > maxi ) maxi = i;
if( --nready <= 0 ) continue; /* no more readable descriptors */
}
for( i = 0; i <= maxi; i++ ){
/* check all clients for data */
if( (sockfd = client[i].fd) < 0 ) continue; /* no more connected clients*/
if( FD_ISSET(sockfd, &rset) ){
printf( "recv occured for connect fd[%d].n",i );
if( (n = recv(sockfd, recvbuf, MAXDATASIZE,0) ) == 0 ){
/*connection closed by client */
close(sockfd);
printf("Client( %d ) closed connection.n",client[i].fd );
FD_CLR(sockfd, &allset);
client[i].fd = -1;
}
else{
process_cli( &client[i], recvbuf, n );
}
if (--nready <= 0) break; /* no more readable descriptors */
}
}
}
close(listenfd); /* close listenfd */
}
void process_cli( CLIENT *client, char* recvbuf, int len )
{
send( client->fd, recvbuf, len, 0 );
}
例子:epoll使用
#include <unistd.h>
#include <sys/types.h> /* basic system data types */
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */
#include <sys/epoll.h> /* epoll function */
#include <fcntl.h> /* nonblocking */
#include <sys/resource.h> /*setrlimit */
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#define MAXEPOLLSIZE 10000
#define MAXLINE 10240
int handle(int connfd);
int setnonblocking( int sockfd )
{
if( fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) {
return -1;
}
return 0;
}
int main(int argc, char **argv)
{
int servPort = 6888;
int listenq = 1024;
int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0;
struct sockaddr_in servaddr, cliaddr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];
struct rlimit rt;
char buf[MAXLINE];
/*設置每個進程允許打開的最大文件數
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
if( setrlimit( RLIMIT_NOFILE, &rt ) == -1 ){
perror("setrlimit error");
return -1;
} */
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(servPort);
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("can't create socket file");
return -1;
}
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if( setnonblocking(listenfd) < 0 ){
perror("setnonblock error");
}
if( bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1 ){
perror("bind error");
return -1;
}
if( listen(listenfd, listenq) == -1 ){
perror("listen error");
return -1;
}
/*創建epoll句柄,把監聽 socket 加入到 epoll 集合里 */
kdpfd = epoll_create( MAXEPOLLSIZE );
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listenfd;
if( epoll_ctl( kdpfd, EPOLL_CTL_ADD, listenfd, &ev ) < 0){
fprintf(stderr, "epoll set insertion error: fd=%dn", listenfd );
return -1;
}
curfds = 1;
printf( "epollserver startup, port %d, max connection is %d, backlog is %dn", servPort, MAXEPOLLSIZE, listenq );
for (;;) {
/* 等待有事件發生 */
nfds = epoll_wait( kdpfd, events, curfds, -1 );
if (nfds == -1){
perror("epoll_wait");
continue;
}
printf( "events hAppen %dn", nfds );
/* 處理所有事件 */
for( n = 0; n < nfds; ++n ){
if( events[n].data.fd == listenfd ){
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &socklen );
if (connfd < 0){
perror("accept error");
continue;
}
sprintf(buf, "accept form %s:%dn", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
printf("%d:%s", ++acceptCount, buf);
if( curfds >= MAXEPOLLSIZE ){
fprintf(stderr, "too many connection, more than %dn", MAXEPOLLSIZE);
close(connfd);
continue;
}
if( setnonblocking(connfd) < 0 ){
perror("setnonblocking error");
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
if( epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev ) < 0 ){
fprintf(stderr, "add socket '%d' to epoll failed: %sn", connfd, strerror(errno));
return -1;
}
curfds++;
continue;
}
// 處理客戶端請求
if( handle( events[n].data.fd ) < 0 ){
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev );
curfds--;
}
}
}
close( listenfd );
return 0;
}
int handle( int connfd ) {
int nread;
char buf[MAXLINE];
nread = read(connfd, buf, MAXLINE); //讀取客戶端socket流
if( nread == 0 ){
printf("client close the connectionn");
close(connfd);
return -1;
}
if( nread < 0 ){
perror("read error");
close(connfd);
return -1;
}
write( connfd, buf, nread ); //響應客戶端
return 0;
}