我們知道目前的 HTTP/1.1 采用的是標準的請求-響應模型,客戶端主動發請求,服務端被動地返回響應。這種模型在客戶端需要實時獲取結果的場景下是不合適的,因為這意味著客戶端需要不斷地輪詢,所以最好的做法是服務端生成結果之后,主動推送給客戶端。
比如 ChatGPT,它在生成內容時,也是生成一部分,就主動向客戶端推送一部分。而在這個過程中,客戶端不需要做任何事情,只需等待 ChatGPT 服務端返回內容即可。
說到這兒,你肯定想到了 WebSocket,沒錯這是一種解決方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的應用層傳輸協議,只不過在握手的時候搭了 HTTP 的便車,利用 HTTP 本身的協議升級特性,偽裝成 HTTP,這樣就能繞過瀏覽器沙箱、網絡防火墻等限制。
當完成握手之后,后續傳輸的數據就不再是 HTTP 報文,而是 WebSocket 格式的二進制幀。所以這兩者完全是不同的協議,那有沒有一種辦法,我們仍然使用 HTTP 協議,同時還能讓服務端主動推送數據呢?
答案是有的,也就是本文將要介紹的 SSE 技術,它的英文全稱是 Server-Sent Events(服務端推送事件)。通過 SSE 可以讓服務端即時推送數據到客戶端,而不需要客戶端輪詢服務端以獲取更新。
圖片
到這你可能會問,那 WebSocket 和 SSE 有什么區別呢?
1)通信方式
WebSocket 提供全雙工通信,服務端和客戶端都可以在同一個連接上同時發送和接收數據。最重要的是,WebSocket 獨立于 HTTP 協議,盡管它開始于一個 HTTP 握手。
SSE 僅提供服務端到客戶端的單向通信,客戶端不能通過 SSE 給服務端發信息。
2)協議和實現
WebSocket 使用自己的協議(ws:// 或 wss://),需要服務端和客戶端都支持,并且協議比較復雜。
SSE 則是使用標準的 HTTP 協議,實現起來更簡單,尤其是在服務端。
3)適用場景
WebSocket 適用于服務端和客戶端之間雙向實時通信的場景,如在線游戲、聊天應用等。
SSE 適用于服務端向客戶端單向推送數據的場景,如消息通知、數據更新。并且 SSE 自動支持斷線重連,而 WebSocket 則需要額外部署。
4)復雜性和資源使用
WebSocket 由于其雙向通信的能力,通常比 SSE 更復雜,可能需要更多的資源來維護和管理連接。
SSE 因為其單向性和基于 HTTP 的特性,它可以利用現有的網絡基礎設施,如代理服務器、負載均衡器和防火墻等等,通常更容易實現和維護。
相信現在你已經明白 SSE 是做什么的了,它的目的就是讓服務端能夠主動推送數據給客戶端。如果不需要和服務端動態交互,只是希望服務端在有數據的時候推過來,那么 WebSocket 就有些太重了,因為這意味著要替換 HTTP 協議,而使用 SSE 無疑是更好的選擇。
SSE 是什么我們已經知道了,那它是怎么實現的呢?原理是什么呢?
1)建立連接
客戶端發起一個標準的 HTTP 請求來開啟 SSE 會話,這個請求的特殊之處在于它包含一個頭字段。
Accept: text/event-stream
相當于客戶端告訴服務端,期望接收 SSE 消息流。而服務端在看到該字段時,也知道這是一個 SSE 請求,于是立即向客戶端返回響應頭,注意:返回的只有響應頭,里面會包含如下頭字段。
Content-Type: text/event-stream
響應頭返回之后標志著 SSE 連接成功建立,并且連接會保持開放狀態,服務端后續可以隨時通過此連接向客戶端發送數據。此外當連接不小心斷開時,客戶端也會自動進行重連。
所以在普通的 HTTP 請求中,一旦服務端返回,那么請求結束了。雖然可以將 Connection 頭字段設置為 keep-alive 保證連接不斷開,但每次訪問都包含了 HTTP 請求/響應的完整過程。
而在 SSE 中,服務端會保持一個開放的連接,只要有新數據可用,就會直接發送給客戶端。所以服務端會將響應以流的形式發送給客戶端,每次發送的消息都是響應流的一部分,而不是獨立的 HTTP 響應。
因此 SSE 的服務端在發送數據時,并不遵循傳統的一次請求,一次響應模式。它在建立連接之后會保持連接開放,并通過這個持續的連接流式地發送數據,這種方式就使得 SSE 非常適合實時數據推送的場景。
2)發送消息
客戶端發送請求,服務端返回響應頭之后,SSE 連接就建立成功了。此時客戶端只需要躺平,安靜地等待服務端的輸出即可。所以現在的關鍵就在于服務端要返回什么格式的數據呢?很簡單,一個基本的消息由以下幾部分組成:
- data:實際的消息數據;
- id:可選,消息的唯一標識符,用于在連接重新建立時同步消息;
- event:可選,定義事件類型,用于客戶端區分消息的類型;
- retry:可選,自動重連的時間(毫秒),如果連接中斷,客戶端在自動重新連接之前,需要等待多長時間;
注意:每個消息要以兩個換行符(nn)結束,舉個例子,我們發送一個 Hello World。
data: Hello Worldnn
也可以發送帶有事件類型的消息:
event: userUpdate
data: {"username": "Serpen", "age": 18}nn
還是比較簡單的,服務端可以保持連接并隨時發送更多數據。然后客戶端在收到時會進行處理,但不需要(也不能)對服務端作出任何回應,它只需要被動地接收來自服務端的數據即可。當服務端認為數據已經全部發送完畢、無需再發時,那么便可以主動斷開連接。
關于 SSE 的原理我們就解釋清楚了,下面來實際編程實現它,這里我們先使用原生的 asyncio 實現 SSE。
import asyncio
from asyncio import StreamReader, StreamWriter
class SSE:
def __init__(self, host="0.0.0.0", port=9999):
self.host = host
self.port = port
@staticmethod
def parse_request_headers(data: bytes) -> dict:
"""
此函數負責從原始字節流中解析出請求頭
"""
headers = data.split(b"rnrn")[0].split(b"rn")
header_dict = {}
for header in headers[1:]:
key, val = header.decode("utf-8").split(":", 1)
header_dict[key.lower()] = val.strip()
return header_dict
async def handler_requests(self,
reader: StreamReader,
writer: StreamWriter):
"""
負責處理來自客戶端的請求
每來一個客戶端連接,就會基于此函數創建一個協程
并且自動傳遞兩個參數:reader 和 writer
reader.read 負責讀取數據,等價于 socket.recv
writer.write 負責發送數據,等價于 socket.send
"""
# 獲取客戶端的請求報文,這里對請求方法、請求地址不做限制
data = awAIt reader.readuntil(b"rnrn")
# 解析出請求頭
request_headers = self.parse_request_headers(data)
# 簡單檢測一下 accept 字段,如果不是建立 SSE,那么直接關閉連接
if request_headers.get("accept") != "text/event-stream":
writer.close()
return await writer.wait_closed()
# 如果是 SSE 連接,那么返回響應頭
response_header = (
b"HTTP/1.1 200 OKrn"
b"Content-Type: text/event-streamrn"
b"Cache-Control: no-cachern"
b"Connection: keep-alivern"
b'Access-Control-Allow-Origin: *rn'
b"rn"
)
writer.write(response_header)
await writer.drain()
# 然后便可以不斷地向客戶端返回數據了
for _ in range(5):
# 每隔 1 秒返回數據
data = "data: 高老師總能分享出好東西rnrn".encode("utf-8")
writer.write(data)
await writer.drain()
await asyncio.sleep(1)
# 數據傳輸完畢
writer.close()
await writer.wait_closed()
async def __create_server(self):
# 創建服務,第一個參數是一個回調函數
# 當連接過來的時候就會根據此函數創建一個協程
# 后面是綁定的 ip 和 端口
server = await asyncio.start_server(self.handler_requests,
self.host,
self.port)
# 然后開啟無限循環
async with server:
await server.serve_forever()
def run_server(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.__create_server())
if __name__ == '__main__':
sse = SSE()
sse.run_server()
服務端代碼編寫完畢,下面編寫前端代碼。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<style>
#data {
font-weight: bold;
color: cadetblue;
font-size: large;
}
</style>
</head>
<body>
<h1>SSE Test</h1>
<div id="data"></div>
<script>
document.addEventListener("DOMContentLoaded", function () {
// 和服務端建立 SSE 連接
var eventSource = new EventSource("http://localhost:9999");
eventSource.onmessage = function (e) {
// 將數據渲染在 <div id="data"></div> 的內部
var data = e.data + "n";
document.getElementById('data').innerText += data;
};
eventSource.onerror = function (e) {
console.error('Error occurred:', e);
eventSource.close();
};
});
</script>
</body>
</html>
代碼編寫完畢,我們用瀏覽器打開 HTML 文件,便可看到如下效果。
以上我們就簡單實現了 SSE,當然為了加深印象,這里的后端是使用原生的 asyncio 編寫的,但在工作中,我們會使用現成的 Web 框架,比如 FastAPI,Blacksheep 等等。
需要說明的是,雖然通過 SSE 技術可以實現類似 ChatGPT 的效果,但 ChatGPT 內部并沒有用到 SSE,它內部是基于 HTTP 的分塊傳輸實現的。因為 SSE 只能通過 GET 請求發出,并且無法自定義請求頭。
如果想實現 ChatGPT 的效果,需要使用 HTTP 的分塊傳輸。而像 FastAPI、BlackSheep 等框架提供的流式響應,便是基于 HTTP 的分塊傳輸實現的,比如 FastAPI:
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
App = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
async def event_generator():
for _ in range(5):
# 每隔 1 秒返回數據
data = "data: 高老師總能分享出好東西rnrn".encode("utf-8")
yield data
await asyncio.sleep(1)
@app.get("/")
async def sse():
return StreamingResponse(event_generator(),
media_type="text/event-stream")
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=9999)
首先之前的前端代碼依舊可以正常訪問,通過修改數據格式和 Content-Type 可以讓其支持 SSE。但最正確的做法是直接訪問 localhost:9999,效果如下:
圖片
所以基于 StreamingResponse 可以實現 SSE,也可以直接訪問。而直接訪問的話,此時里面的 data: 和 rn 就是實體數據的一部分。并且這種方式和 ChatGPT 的工作機制是相似的,都使用了 HTTP 的分塊傳輸,支持所有的請求方法,而 SSE 只支持 GET 請求。
BlackSheep 也是類似的,它同樣也支持流式響應。
import asyncio
from blacksheep import Application, Response, StreamedContent
import uvicorn
app = Application()
app.use_cors(
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
async def event_generator():
for _ in range(5):
# 每隔 1 秒返回數據
data = "data: 高老師總能分享出好東西rnrn".encode("utf-8")
yield data
await asyncio.sleep(1)
@app.router.get("/")
async def sse():
return Response(
200,
cnotallow=StreamedContent(b"text/event-stream", event_generator),
)
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=9999)
可以測試一下,效果是一樣的。如果你不想實現 SSE,只是希望固定的數據以流的形式一點一點返回,那么記得將數據中多余的 data: 和 rn 給去掉,并最好修改 Content-Type 為合適的類型。
所以 SSE 一般用于需要服務端推數據,但數據不知道什么時候會過來,于是通過 SSE 保持連接開放。后續當服務端有數據了,直接通過連接發送給客戶端即可。
而 FastAPI 和 BlackSheep 提供的流式響應更像是,返回的數據比較龐大,如果全部準備好再一次性返回,會讓用戶陷入長時間的等待,造成不好的體驗。于是通過分塊傳輸,準備好一部分就返回一部分。雖然整體時間沒變,但可以讓用戶立刻獲取到數據,從而提升用戶體驗。
比如 ChatGPT,當它回答的內容比較多的時候,那么整個過程耗費幾十秒鐘是常有的事情,假設 30 秒。相比讓用戶等待 30 秒,然后內容一下子刷出來,顯然生成一部分返回一部分這種方式更讓人喜歡。
因此使用 SSE 還是流式響應,則取決于你當前的業務。如果你返回的數據是確定的,只是準備的時間比較長,或者數據量比較大,那么推薦使用流式響應。
至于 SSE,在這些現成的 Web 框架里面,也可以通過流式響應來實現,只需要將 Content-Type 設置為 text/event-stream,并將數據加上前綴 data: 和后綴 rnrn。
但說實話,如果想實現 SSE,不建議通過流式響應來實現,而是使用專門的庫。以 FastAPI 為例:
from sse_starlette.sse import EventSourceResponse
FastAPI 其實就是在 starlette 的基礎上套了一層殼,通過安裝 sse_starlette 可以讓 FastAPI 更好地支持 SSE。
以上就是本文的內容,如果對你有幫助,就點個贊吧。