IO 多路復(fù)用
就是我們說(shuō)的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。
select/epoll的好處就在于單個(gè)process就可以同時(shí)處理多個(gè)網(wǎng)絡(luò)連接的IO。
它的基本原理就是select,poll,epoll這個(gè)function會(huì)不斷的輪詢所負(fù)責(zé)的所有socket,當(dāng)某個(gè)socket有數(shù)據(jù)到達(dá)了,就通知用戶進(jìn)程。
epoll簡(jiǎn)單模型
import socket import select # 創(chuàng)建套接字 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設(shè)置可以重復(fù)使用綁定的信息 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) # 綁定本機(jī)信息 s.bind(("",7788)) # 變?yōu)楸粍?dòng) s.listen(10) # 創(chuàng)建一個(gè)epoll對(duì)象 epoll = select.epoll() # 測(cè)試,用來(lái)打印套接字對(duì)應(yīng)的文件描述符 # print(s.fileno()) # print(select.EPOLLIN|select.EPOLLET) # 注冊(cè)事件到epoll中 # epoll.register(fd[, eventmask]) # 注意,如果fd已經(jīng)注冊(cè)過,則會(huì)發(fā)生異常 # 將創(chuàng)建的套接字添加到epoll的事件監(jiān)聽中 epoll.register(s.fileno(), select.EPOLLIN|select.EPOLLET) connections = {} addresses = {} # 循環(huán)等待客戶端的到來(lái)或者對(duì)方發(fā)送數(shù)據(jù) while True: # epoll 進(jìn)行 fd 掃描的地方 -- 未指定超時(shí)時(shí)間則為阻塞等待 epoll_list = epoll.poll() # 對(duì)事件進(jìn)行判斷 for fd, events in epoll_list: # print fd # print events # 如果是socket創(chuàng)建的套接字被激活 if fd == s.fileno(): new_socket, new_addr = s.accept() print('有新的客戶端到來(lái)%s' % str(new_addr)) # 將 conn 和 addr 信息分別保存起來(lái) connections[new_socket.fileno()] = new_socket addresses[new_socket.fileno()] = new_addr # 向 epoll 中注冊(cè) 新socket 的 可讀 事件 epoll.register(new_socket.fileno(), select.EPOLLIN|select.EPOLLET) # 如果是客戶端發(fā)送數(shù)據(jù) elif events == select.EPOLLIN: # 從激活 fd 上接收 recvData = connections[fd].recv(1024).decode("utf-8") if recvData: print('recv:%s' % recvData) else: # 從 epoll 中移除該 連接 fd epoll.unregister(fd) # server 側(cè)主動(dòng)關(guān)閉該 連接 fd connections[fd].close() print("%s---offline---" % str(addresses[fd])) del connections[fd] del addresses[fd]
說(shuō)明
EPOLLIN (可讀)
EPOLLOUT (可寫)
EPOLLET (ET模式)
epoll對(duì)文件描述符的操作有兩種模式:LT(level trigger)和ET(edge trigger)。LT模式是默認(rèn)模式,LT模式與ET模式的區(qū)別如下:
LT模式:當(dāng)epoll檢測(cè)到描述符事件發(fā)生并將此事件通知應(yīng)用程序,應(yīng)用程序可以不立即處理該事件。下次調(diào)用epoll時(shí),會(huì)再次響應(yīng)應(yīng)用程序并通知此事件。 ET模式:當(dāng)epoll檢測(cè)到描述符事件發(fā)生并將此事件通知應(yīng)用程序,應(yīng)用程序必須立即處理該事件。如果不處理,下次調(diào)用epoll時(shí),不會(huì)再次響應(yīng)應(yīng)用程序并通知此事件。
web靜態(tài)服務(wù)器-epool
以下代碼,支持http的長(zhǎng)連接,即使用了Content-Length
import socket import time import sys import re import select class WSGIServer(object): """定義一個(gè)WSGI服務(wù)器的類""" def __init__(self, port, documents_root): # 1. 創(chuàng)建套接字 self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 2. 綁定本地信息 self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(("", port)) # 3. 變?yōu)楸O(jiān)聽套接字 self.server_socket.listen(128) self.documents_root = documents_root # 創(chuàng)建epoll對(duì)象 self.epoll = select.epoll() # 將tcp服務(wù)器套接字加入到epoll中進(jìn)行監(jiān)聽 self.epoll.register(self.server_socket.fileno(), select.EPOLLIN|select.EPOLLET) # 創(chuàng)建添加的fd對(duì)應(yīng)的套接字 self.fd_socket = dict() def run_forever(self): """運(yùn)行服務(wù)器""" # 等待對(duì)方鏈接 while True: # epoll 進(jìn)行 fd 掃描的地方 -- 未指定超時(shí)時(shí)間則為阻塞等待 epoll_list = self.epoll.poll() # 對(duì)事件進(jìn)行判斷 for fd, event in epoll_list: # 如果是服務(wù)器套接字可以收數(shù)據(jù),那么意味著可以進(jìn)行accept if fd == self.server_socket.fileno(): new_socket, new_addr = self.server_socket.accept() # 向 epoll 中注冊(cè) 連接 socket 的 可讀 事件 self.epoll.register(new_socket.fileno(), select.EPOLLIN | select.EPOLLET) # 記錄這個(gè)信息 self.fd_socket[new_socket.fileno()] = new_socket # 接收到數(shù)據(jù) elif event == select.EPOLLIN: request = self.fd_socket[fd].recv(1024).decode("utf-8") if request: self.deal_with_request(request, self.fd_socket[fd]) else: # 在epoll中注銷客戶端的信息 self.epoll.unregister(fd) # 關(guān)閉客戶端的文件句柄 self.fd_socket[fd].close() # 在字典中刪除與已關(guān)閉客戶端相關(guān)的信息 del self.fd_socket[fd] def deal_with_request(self, request, client_socket): """為這個(gè)瀏覽器服務(wù)器""" if not request: return request_lines = request.splitlines() for i, line in enumerate(request_lines): print(i, line) # 提取請(qǐng)求的文件(index.html) # GET /a/b/c/d/e/index.html HTTP/1.1 ret = re.match(r"([^/]*)([^ ]+)", request_lines[0]) if ret: print("正則提取數(shù)據(jù):", ret.group(1)) print("正則提取數(shù)據(jù):", ret.group(2)) file_name = ret.group(2) if file_name == "/": file_name = "/index.html" # 讀取文件數(shù)據(jù) try: f = open(self.documents_root+file_name, "rb") except: response_body = "file not found, 請(qǐng)輸入正確的url" response_header = "HTTP/1.1 404 not foundrn" response_header += "Content-Type: text/html; charset=utf-8rn" response_header += "Content-Length: %drn" % len(response_body) response_header += "rn" # 將header返回給瀏覽器 client_socket.send(response_header.encode('utf-8')) # 將body返回給瀏覽器 client_socket.send(response_body.encode("utf-8")) else: content = f.read() f.close() response_body = content response_header = "HTTP/1.1 200 OKrn" response_header += "Content-Length: %drn" % len(response_body) response_header += "rn" # 將數(shù)據(jù)返回給瀏覽器 client_socket.send(response_header.encode("utf-8")+response_body) # 設(shè)置服務(wù)器服務(wù)靜態(tài)資源時(shí)的路徑 DOCUMENTS_ROOT = "./html" def main(): """控制web服務(wù)器整體""" # Python3 xxxx.py 7890 if len(sys.argv) == 2: port = sys.argv[1] if port.isdigit(): port = int(port) else: print("運(yùn)行方式如: python3 xxx.py 7890") return print("http服務(wù)器使用的port:%s" % port) http_server = WSGIServer(port, DOCUMENTS_ROOT) http_server.run_forever() if __name__ == "__main__": main()
小總結(jié)
I/O 多路復(fù)用的特點(diǎn):
通過一種機(jī)制使一個(gè)進(jìn)程能同時(shí)等待多個(gè)文件描述符,而這些文件描述符(套接字描述符)其中的任意一個(gè)進(jìn)入讀就緒狀態(tài),epoll()函數(shù)就可以返回。 所以, IO多路復(fù)用,本質(zhì)上不會(huì)有并發(fā)的功能,因?yàn)槿魏螘r(shí)候還是只有一個(gè)進(jìn)程或線程進(jìn)行工作,它之所以能提高效率是因?yàn)閟electepoll 把進(jìn)來(lái)的socket放到他們的 ‘監(jiān)視’ 列表里面,當(dāng)任何socket有可讀可寫數(shù)據(jù)立馬處理,那如果selectepoll 手里同時(shí)檢測(cè)著很多socket, 一有動(dòng)靜馬上返回給進(jìn)程處理,總比一個(gè)一個(gè)socket過來(lái),阻塞等待,處理高效率。
當(dāng)然也可以多線程/多進(jìn)程方式,一個(gè)連接過來(lái)開一個(gè)進(jìn)程/線程處理,這樣消耗的內(nèi)存和進(jìn)程切換頁(yè)會(huì)耗掉更多的系統(tǒng)資源。 所以我們可以結(jié)合IO多路復(fù)用和多進(jìn)程/多線程 來(lái)高性能并發(fā),IO復(fù)用負(fù)責(zé)提高接受socket的通知效率,收到請(qǐng)求后,交給進(jìn)程池/線程池來(lái)處理邏輯。