大語(yǔ)言模型生成響應(yīng)整個(gè)結(jié)果的過(guò)程是比較漫長(zhǎng)的,但逐token生成的響應(yīng)比較快,ChatGPT將這一特性與SSE技術(shù)充分結(jié)合,一個(gè)字一個(gè)字地彈出回復(fù),在用戶(hù)體驗(yàn)上實(shí)現(xiàn)了質(zhì)的提升。
ChatGPT回答問(wèn)題時(shí),是一個(gè)字一個(gè)字彈出的,給人一種在認(rèn)真思考的感覺(jué)。
這段對(duì)話的基本內(nèi)容是:
問(wèn):大語(yǔ)言生成式模型是如何工作的
答:大型語(yǔ)言生成模型(如GPT-3)是基于深度學(xué)習(xí)技術(shù)的神經(jīng)網(wǎng)絡(luò)模型。這些模型通常使用一種稱(chēng)為"Transformer"的架構(gòu),該架構(gòu)在自然語(yǔ)言處理領(lǐng)域取得了巨大的成功。下面是大型語(yǔ)言生成模型的工作原理概述:
... 省略一段 ...
生成文本:一旦模型經(jīng)過(guò)訓(xùn)練,就可以使用它來(lái)生成文本。給定一個(gè)初始文本或提示,模型會(huì)根據(jù)已經(jīng)學(xué)到的概率分布生成下一個(gè)單詞,然后使用生成的單詞作為輸入,并繼續(xù)生成下一個(gè)單詞,以此類(lèi)推,直到達(dá)到所需的文本長(zhǎng)度或生成終止符號(hào)。
... 省略一段 ...
大語(yǔ)言模型,Large Language Model,簡(jiǎn)稱(chēng)LLM。
從模型的視角來(lái)看,LLM每進(jìn)行一次推理生成一個(gè)token,直到達(dá)到文本長(zhǎng)度限制或生成終止符。
從服務(wù)端的視角來(lái)看,生成的token需要通過(guò)HTTPS協(xié)議逐個(gè)返回到瀏覽器端。
Client-Server 模式下,常規(guī)的交互方式是client端發(fā)送一次請(qǐng)求,接收一次響應(yīng)。顯然,這無(wú)法滿(mǎn)足ChatGPT回復(fù)問(wèn)題的場(chǎng)景。
其次,我們可能想到websocket,它依賴(lài)HTTP實(shí)現(xiàn)握手,升級(jí)成WebSocket。不過(guò)WebSocket需要client和server都持續(xù)占用一個(gè)socket,server側(cè)成本比較高。
ChatGPT使用的是一種折衷方案: server-sent event(簡(jiǎn)稱(chēng)SSE). 我們從AI target=_blank class=infotextkey>OpenAI的 API 文檔可以發(fā)現(xiàn)這一點(diǎn):
SSE 模式下,client只需要向server發(fā)送一次請(qǐng)求,server就能持續(xù)輸出,直到需要結(jié)束。整個(gè)交互過(guò)程如下圖所示:
SSE仍然使用HTTP作為應(yīng)用層傳輸協(xié)議,充分利用HTTP的長(zhǎng)連接能力,實(shí)現(xiàn)服務(wù)端推送能力。
從代碼層面來(lái)看,SSE模式與單次HTTP請(qǐng)求不同的點(diǎn)有:
- client端需要開(kāi)啟 keep-alive,保證連接不會(huì)超時(shí)。
- HTTP響應(yīng)的Header包含 Content-Type=text/event-stream,Cache-Cnotallow=no-cache 等。
- HTTP響應(yīng)的body一般是 "data: ..." 這樣的結(jié)構(gòu)。
- HTTP響應(yīng)里可能有一些空數(shù)據(jù),以避免連接超時(shí)。
以 ChatGPT API 為例,在發(fā)送請(qǐng)求時(shí),將stream參數(shù)設(shè)置為true就啟用了SSE特性,但在讀取數(shù)據(jù)的SDK里需要稍加注意。
在常規(guī)模式下,拿到 http.Response 后,用 ioutil.ReadAll 將數(shù)據(jù)讀出來(lái)即可,代碼如下:
func main() {
payload := strings.NewReader(`{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "大語(yǔ)言生成式模型是如何工作的"}],
"max_tokens": 1024,
"temperature": 1,
"top_p": 1,
"n": 1,
"stream": false
}`)
client := &http.Client{}
req, _ := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", payload)
req.Header.Add("Content-Type", "Application/json")
req.Header.Add("Authorization", "Bearer <OpenAI-Token>")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(body))
}
執(zhí)行大概耗費(fèi)20s+,得到一個(gè)完整的結(jié)果:
{
"id": "chatcmpl-7KklTf9mag5tyBXLEqM3PWQn4jlfD",
"object": "chat.completion",
"created": 1685180679,
"model": "gpt-3.5-turbo-0301",
"usage": {
"prompt_tokens": 21,
"completion_tokens": 358,
"total_tokens": 379
},
"choices": [
{
"message": {
"role": "assistant",
"content": "大語(yǔ)言生成式模型通常采用神經(jīng)網(wǎng)絡(luò)來(lái)實(shí)現(xiàn),具體工作流程如下:nn1. 數(shù)據(jù)預(yù)處理:將語(yǔ)料庫(kù)中的文本數(shù)據(jù)進(jìn)行預(yù)處理,包括分詞、刪除停用詞(如“的”、“了”等常用詞匯)、去重等操作,以減少冗余信息。nn2. 模型訓(xùn)練:采用遞歸神經(jīng)網(wǎng)絡(luò)(RNN)、長(zhǎng)短期記憶網(wǎng)絡(luò)(LSTM)或變種的Transformers等模型進(jìn)行訓(xùn)練,這些模型都具有一定的記憶能力,可以學(xué)習(xí)到語(yǔ)言的一定規(guī)律,并預(yù)測(cè)下一個(gè)可能出現(xiàn)的詞語(yǔ)。nn3. 模型應(yīng)用:當(dāng)模型完成訓(xùn)練后,可以將其應(yīng)用于實(shí)際的生成任務(wù)中。模型接收一個(gè)輸入文本串,并預(yù)測(cè)下一個(gè)可能出現(xiàn)的詞語(yǔ),直到達(dá)到一定長(zhǎng)度或遇到結(jié)束符號(hào)為止。nn4. 根據(jù)生成結(jié)果對(duì)模型進(jìn)行調(diào)優(yōu):生成的結(jié)果需要進(jìn)行評(píng)估,如計(jì)算生成文本與語(yǔ)料庫(kù)文本的相似度、流暢度等指標(biāo),以此來(lái)調(diào)優(yōu)模型,提高其生成質(zhì)量。nn總體而言,大語(yǔ)言生成式模型通過(guò)對(duì)語(yǔ)言的規(guī)律學(xué)習(xí),從而生成高質(zhì)量的文本。"
},
"finish_reason": "stop",
"index": 0
}
]
}
如果我們將 stream 設(shè)置為 true,不做任何修改,請(qǐng)求總消耗28s+,體現(xiàn)為很多條 stream 消息:
上面這張圖是一張Postman調(diào)用 chatgpt api的圖,走的就是 ioutil.ReadAll 的模式。為了實(shí)現(xiàn)stream讀取,我們可以分段讀取 http.Response.Body。下面是這種方式可行的原因:
- http.Response.Body 的類(lèi)型是 io.ReaderCloser,底層依賴(lài)一個(gè)HTTP連接,支持stream讀。
- SSE 返回的數(shù)據(jù)通過(guò)換行符n進(jìn)行分割
所以修正的方法是通過(guò)bufio.NewReader(resp.Body)包裝起來(lái),并在一個(gè)for-loop里讀取, 代碼如下:
// stream event 結(jié)構(gòu)體定義
type ChatCompletionRspChoiceItem struct {
Delta map[string]string `json:"delta,omitempty"` // 只有 content 字段
Index int `json:"index,omitempty"`
Logprobs *int `json:"logprobs,omitempty"`
FinishReason string `json:"finish_reason,omitempty"`
}
type ChatCompletionRsp struct {
ID string `json:"id"`
Object string `json:"object"`
Created int `json:"created"` // unix second
Model string `json:"model"`
Choices []ChatCompletionRspChoiceItem `json:"choices"`
}
func main() {
payload := strings.NewReader(`{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "大語(yǔ)言生成式模型是如何工作的"}],
"max_tokens": 1024,
"temperature": 1,
"top_p": 1,
"n": 1,
"stream": true
}`)
client := &http.Client{}
req, _ := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", payload)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", "Bearer "+apiKey)
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Connection", "keep-alive")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
return
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
for {
line, err := reader.ReadBytes('n')
if err != nil {
if err == io.EOF {
// 忽略 EOF 錯(cuò)誤
break
} else {
if.NETErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("[PostStream] fails to read response body, timeoutn")
} else {
fmt.Printf("[PostStream] fails to read response body, err=%sn", err)
}
}
break
}
line = bytes.TrimSuffix(line, []byte{'n'})
line = bytes.TrimPrefix(line, []byte("data: "))
if bytes.Equal(line, []byte("[DONE]")) {
break
} else if len(line) > 0 {
var chatCompletionRsp ChatCompletionRsp
if err := json.Unmarshal(line, &chatCompletionRsp); err == nil {
fmt.Printf(chatCompletionRsp.Choices[0].Delta["content"])
} else {
fmt.Printf("ninvalid line=%sn", line)
}
}
}
fmt.Println("the end")
}
看完client端,我們?cè)倏磗erver端。現(xiàn)在我們嘗試mock chatgpt server逐字返回一段文字。這里涉及到兩個(gè)點(diǎn):
- Response Header 需要設(shè)置 Connection 為 keep-alive 和 Content-Type 為 text/event-stream。
- 寫(xiě)入 respnose 以后,需要flush到client端。
代碼如下:
func streamHandler(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
var chatCompletionRsp ChatCompletionRsp
runes := []rune(`大語(yǔ)言生成式模型通常使用深度學(xué)習(xí)技術(shù),例如循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)或變壓器(Transformer)來(lái)建模語(yǔ)言的概率分布。這些模型接收前面的詞匯序列,并利用其內(nèi)部神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)預(yù)測(cè)下一個(gè)詞匯的概率分布。然后,模型將概率最高的詞匯作為生成的下一個(gè)詞匯,并遞歸地生成一個(gè)詞匯序列,直到到達(dá)最大長(zhǎng)度或遇到一個(gè)終止符號(hào)。
在訓(xùn)練過(guò)程中,模型通過(guò)最大化生成的文本樣本的概率分布來(lái)學(xué)習(xí)有效的參數(shù)。為了避免模型產(chǎn)生過(guò)于平凡的、重復(fù)的、無(wú)意義的語(yǔ)言,我們通常會(huì)引入一些技巧,如dropout、序列擾動(dòng)等。
大語(yǔ)言生成模型的重要應(yīng)用包括文本生成、問(wèn)答系統(tǒng)、機(jī)器翻譯、對(duì)話建模、摘要生成、文本分類(lèi)等。`)
for _, r := range runes {
chatCompletionRsp.Choices = []ChatCompletionRspChoiceItem{
{Delta: map[string]string{"content": string(r)}},
}
bs, _ := json.Marshal(chatCompletionRsp)
line := fmt.Sprintf("data: %sn", bs)
fmt.Fprintf(w, line)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
time.Sleep(time.Millisecond * 100)
}
fmt.Fprintf(w, "data: [DONE]n")
}
func main() {
http.HandleFunc("/stream", streamHandler)
http.ListenAndServe(":8088", nil)
}
在真實(shí)場(chǎng)景中,要返回的數(shù)據(jù)來(lái)源于另一個(gè)服務(wù)或函數(shù)調(diào)用,如果這個(gè)服務(wù)或函數(shù)調(diào)用返回時(shí)間不穩(wěn)定,可能導(dǎo)致client端長(zhǎng)時(shí)間收不到消息,所以一般的處理方式是:
- 對(duì)第三方的調(diào)用放到一個(gè) goroutine 中。
- 通過(guò) time.Tick 創(chuàng)建一個(gè)定時(shí)器,向client端發(fā)送空消息。
- 創(chuàng)建一個(gè)timeout channel,避免響應(yīng)時(shí)間太久。
為了能夠從不同的channel讀取數(shù)據(jù),select 是一個(gè)不錯(cuò)的關(guān)鍵字,比如這段演示代碼:
// 聲明一個(gè) event channel
// 聲明一個(gè) time.Tick channel
// 聲明一個(gè) timeout channel
select {
case ev := <-events:
// send data event
case <- timeTick:
// send empty event
case <-timeout:
fmt.Fprintf(w, "[Done]nn")
}
小結(jié)一下
大語(yǔ)言模型生成響應(yīng)整個(gè)結(jié)果的過(guò)程是比較漫長(zhǎng)的,但逐token生成的響應(yīng)比較快,ChatGPT將這一特性與SSE技術(shù)充分結(jié)合,一個(gè)字一個(gè)字地彈出回復(fù),在用戶(hù)體驗(yàn)上實(shí)現(xiàn)了質(zhì)的提升。
縱觀生成式模型,不管是LLAMA/小羊駝 (不能商用),還是Stable Diffusion/Midjourney。在提供線上服務(wù)時(shí),均可利用SSE技術(shù)節(jié)省提升用戶(hù)體驗(yàn),節(jié)省服務(wù)器資源。