Go語言最吸引人的地方是它內建的并發(fā)支持。Go語言并發(fā)體系的理論是C.A.R Hoare在1978年提出的通信順序進程(Communicating Sequential Process,CSP)。CSP有著精確的數學模型,并實際應用在了Hoare參與設計的T9000通用計算機上。從Newsqueak、Alef、Limbo到現在的Go語言,對于對CSP有著20多年實戰(zhàn)經驗的Rob Pike來說,他更關注的是將CSP應用在通用編程語言上產生的潛力。作為Go并發(fā)編程核心的CSP理論的核心概念只有一個:同步通信。關于同步通信的話題我們在前文已經講過,本節(jié)我們將簡單介紹Go語言中常見的并發(fā)模式。
首先要明確一個概念:并發(fā)不是并行。并發(fā)更關注的是程序的設計層面,并發(fā)的程序完全是可以順序執(zhí)行的,只有在真正的多核CPU上才可能真正地同時運行。并行更關注的是程序的運行層面,并行一般是簡單的大量重復,例如,GPU中對圖像處理都會有大量的并行運算。為了更好地編寫并發(fā)程序,從設計之初Go語言就注重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程序員專注于分解問題和組合方案,而且不用被線程管理和信號互斥這些煩瑣的操作分散精力。
在并發(fā)編程中,對共享資源的正確訪問需要精確地控制,在目前的絕大多數語言中,都是通過加鎖等線程同步方案來解決這一困難問題,而Go語言卻另辟蹊徑,它將共享的值通過通道傳遞(實際上多個獨立執(zhí)行的線程很少主動共享資源)。在任意給定的時刻,最好只有一個Goroutine能夠擁有該資源。數據競爭從設計層面上就被杜絕了。為了提倡這種思考方式,Go語言將其并發(fā)編程哲學化為一句口號:“不要通過共享內存來通信,而應通過通信來共享內存。”(Do not communicate by sharing memory; instead, share memory by communicating.)
這是更高層次的并發(fā)編程哲學(通過通道來傳值是Go語言推薦的做法)。雖然像引用計數這類簡單的并發(fā)問題通過原子操作或互斥鎖就能很好地實現,但是通過通道來控制訪問能夠讓你寫出更簡潔正確的程序。
1.6.1 并發(fā)版本的“Hello, World”
先以在一個新的Goroutine中輸出“你好, 世界”,main等待后臺線程輸出工作完成之后退出的簡單的并發(fā)程序作為熱身。
并發(fā)編程的核心概念是同步通信,但是同步的方式卻有多種。先以大家熟悉的互斥量sync.Mutex來實現同步通信。根據文檔,我們不能直接對一個未加鎖狀態(tài)的sync.Mutex進行解鎖,這會導致運行時異常。下面這種方式并不能保證正常工作:
func main() { var mu sync.Mutex go func(){ fmt.Println("你好, 世界") mu.Lock() }() mu.Unlock() }
因為mu.Lock()和mu.Unlock()并不在同一個Goroutine中,所以也就不滿足順序一致性內存模型。同時它們也沒有其他的同步事件可以參考,這兩個事件不可排序也就是可以并發(fā)的。因為可能是并發(fā)的事件,所以main()函數中的mu.Unlock()很有可能先發(fā)生,而這個時刻mu互斥對象還處于未加鎖的狀態(tài),因而會導致運行時異常。
下面是修復后的代碼:
func main() { var mu sync.Mutex mu.Lock() go func(){ fmt.Println("你好, 世界") mu.Unlock() }() mu.Lock() }
修復的方式是在main()函數所在線程中執(zhí)行兩次mu.Lock(),當第二次加鎖時會因為鎖已經被占用(不是遞歸鎖)而阻塞,main()函數的阻塞狀態(tài)驅動后臺線程繼續(xù)向前執(zhí)行。當后臺線程執(zhí)行到mu.Unlock()時解鎖,此時打印工作已經完成了,解鎖會導致main()函數中的第二個mu.Lock()阻塞狀態(tài)取消,此時后臺線程和主線程再沒有其他的同步事件參考,它們退出的事件將是并發(fā)的:在main()函數退出導致程序退出時,后臺線程可能已經退出了,也可能沒有退出。雖然無法確定兩個線程退出的時間,但是打印工作是可以正確完成的。
使用sync.Mutex互斥鎖同步是比較低級的做法。我們現在改用無緩存通道來實現同步:
func main() { done := make(chan int) go func(){ fmt.Println("你好, 世界") <-done }() done <- 1 }
根據Go語言內存模型規(guī)范,對于從無緩存通道進行的接收,發(fā)生在對該通道進行的發(fā)送完成之前。因此,后臺線程<-done接收操作完成之后,main線程的done <- 1發(fā)送操作才可能完成(從而退出main、退出程序),而此時打印工作已經完成了。
上面的代碼雖然可以正確同步,但是對通道的緩存大小太敏感:如果通道有緩存,就無法保證main()函數退出之前后臺線程能正常打印了。更好的做法是將通道的發(fā)送和接收方向調換一下,這樣可以避免同步事件受通道緩存大小的影響:
func main() { done := make(chan int, 1) // 帶緩存通道 go func(){ fmt.Println("你好, 世界") done <- 1 }() <-done }
對于帶緩存的通道,對通道的第K
個接收完成操作發(fā)生在第K+C
個發(fā)送操作完成之前,其中C
是通道的緩存大小。雖然通道是帶緩存的,但是main線程接收完成是在后臺線程發(fā)送開始但還未完成的時刻,此時打印工作也是已經完成的。
基于帶緩存通道,我們可以很容易將打印線程擴展到N
個。下面的例子是開啟10個后臺線程分別打印:
func main() { done := make(chan int, 10) // 帶10個緩存 // 開N個后臺打印線程 for i := 0; i < cap(done); i++ { go func(){ fmt.Println("你好, 世界") done <- 1 }() } // 等待N個后臺線程完成 for i := 0; i < cap(done); i++ { <-done } }
對于這種要等待N
個線程完成后再進行下一步的同步操作有一個簡單的做法,就是使用sync.WaitGroup來等待一組事件:
func main() { var wg sync.WaitGroup // 開N個后臺打印線程 for i := 0; i < 10; i++ { wg.Add(1) go func() { fmt.Println("你好, 世界") wg.Done() }() } // 等待N個后臺線程完成 wg.Wait() }
其中wg.Add(1)用于增加等待事件的個數,必須確保在后臺線程啟動之前執(zhí)行(如果放到后臺線程之中執(zhí)行則不能保證被正常執(zhí)行到)。當后臺線程完成打印工作之后,調用wg.Done()表示完成一個事件。main()函數的wg.Wait()是等待全部的事件完成。
1.6.2 生產者/消費者模型
并發(fā)編程中最常見的例子就是生產者/消費者模型,該模型主要通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。簡單地說,就是生產者生產一些數據,然后放到成果隊列中,同時消費者從成果隊列中來取這些數據。這樣就讓生產和消費變成了異步的兩個過程。當成果隊列中沒有數據時,消費者就進入饑餓的等待中;而當成果隊列中數據已滿時,生產者則面臨因產品積壓導致CPU被剝奪的問題。
Go語言實現生產者和消費者并發(fā)很簡單:
// 生產者:生成factor整數倍的序列 func Producer(factor int, out chan<- int) { for i := 0; ; i++ { out <- i*factor } } // 消費者 func Consumer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { ch := make(chan int, 64) // 成果隊列 go Producer(3, ch) // 生成3的倍數的序列 go Producer(5, ch) // 生成5的倍數的序列 go Consumer(ch) // 消費生成的隊列 // 運行一定時間后退出 time.Sleep(5 * time.Second) }
我們開啟了兩個Producer生產流水線,分別用于生成3和5的倍數的序列。然后開啟一個Consumer消費者線程,打印獲取的結果。我們通過在main()函數休眠一定的時間來讓生產者和消費者工作一定時間。正如1.6.1節(jié)中說的,這種靠休眠方式是無法保證穩(wěn)定的輸出結果的。
我們可以讓main()函數保存阻塞狀態(tài)不退出,只有當用戶輸入Ctrl+C時才真正退出程序:
func main() { ch := make(chan int, 64) // 成果隊列 go Producer(3, ch) // 生成3的倍數的序列 go Producer(5, ch) // 生成5的倍數的序列 go Consumer(ch) // 消費生成的隊列 // Ctrl+C 退出 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) fmt.Printf("quit (%v)n", <-sig) }
這個例子中有兩個生產者,并且兩個生產者之間無同步事件可參考,它們是并發(fā)的。因此,消費者輸出的結果序列的順序是不確定的,這并沒有問題,生產者和消費者依然可以相互配合工作。
1.6.3 發(fā)布/訂閱模型
發(fā)布/訂閱(publish-subscribe)模型通常被簡寫為pub/sub模型。在這個模型中,消息生產者成為發(fā)布者(publisher),而消息消費者則成為訂閱者(subscriber),生產者和消費者是M
: N
的關系。在傳統(tǒng)生產者/消費者模型中,是將消息發(fā)送到一個隊列中,而發(fā)布/訂閱模型則是將消息發(fā)布給一個主題。
為此,我們構建了一個名為pubsub的發(fā)布/訂閱模型支持包:
// Package pubsub implements a simple multi-topic pub-sub library. package pubsub import ( "sync" "time" ) type ( subscriber chan interface{} // 訂閱者為一個通道 topicFunc func(v interface{}) bool // 主題為一個過濾器 ) // 發(fā)布者對象 type Publisher struct { m sync.RWMutex // 讀寫鎖 buffer int // 訂閱隊列的緩存大小 timeout time.Duration // 發(fā)布超時時間 subscribers map[subscriber]topicFunc // 訂閱者信息 } // 構建一個發(fā)布者對象,可以設置發(fā)布超時時間和緩存隊列的長度 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), } } // 添加一個新的訂閱者,訂閱全部主題 func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil) } // 添加一個新的訂閱者,訂閱過濾器篩選后的主題 func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch } // 退出訂閱 func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() defer p.m.Unlock() delete(p.subscribers, sub) close(sub) } // 發(fā)布一個主題 func (p *Publisher) Publish(v interface{}) { p.m.RLock() defer p.m.RUnlock() var wg sync.WaitGroup for sub, topic := range p.subscribers { wg.Add(1) go p.sendTopic(sub, topic, v, &wg) } wg.Wait() } // 關閉發(fā)布者對象,同時關閉所有的訂閱者通道 func (p *Publisher) Close() { p.m.Lock() defer p.m.Unlock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } } // 發(fā)送主題,可以容忍一定的超時 func (p *Publisher) sendTopic( sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup, ) { defer wg.Done() if topic != nil && !topic(v) { return } select { case sub <- v: case <-time.After(p.timeout): } }
下面的例子中,有兩個訂閱者分別訂閱了全部主題和含有"golang"的主題:
import "path/to/pubsub" func main() { p := pubsub.NewPublisher(100*time.Millisecond, 10) defer p.Close() all := p.Subscribe() golang := p.SubscribeTopic(func(v interface{}) bool { if s, ok := v.(string); ok { return strings.Contains(s, "golang") } return false }) p.Publish("hello, world!") p.Publish("hello, golang!") go func() { for msg := range all { fmt.Println("all:", msg) } } () go func() { for msg := range golang { fmt.Println("golang:", msg) } } () // 運行一定時間后退出 time.Sleep(3 * time.Second) }
在發(fā)布/訂閱模型中,每條消息都會傳送給多個訂閱者。發(fā)布者通常不會知道,也不關心哪一個訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運行時動態(tài)添加,它們之間是一種松散的耦合關系,這使得系統(tǒng)的復雜性可以隨時間的推移而增長。在現實生活中,像天氣預報之類的應用就可以應用這種并發(fā)模式。
1.6.4 控制并發(fā)數
很多用戶在適應了Go語言強大的并發(fā)特性之后,都傾向于編寫最大并發(fā)的程序,因為這樣似乎可以提供最高的性能。在現實中我們行色匆匆,但有時卻需要我們放慢腳步享受生活,并發(fā)的程序也是一樣:有時候我們需要適當地控制并發(fā)的程度,因為這樣不僅可給其他的應用/任務讓出/預留一定的CPU資源,也可以適當降低功耗緩解電池的壓力。
在Go語言自帶的godoc程序實現中有一個vfs的包對應虛擬的文件系統(tǒng),在vfs包下面有一個gatefs的子包,gatefs子包的目的就是為了控制訪問該虛擬文件系統(tǒng)的最大并發(fā)數。gatefs包的應用很簡單:
import ( "golang.org/x/tools/godoc/vfs" "golang.org/x/tools/godoc/vfs/gatefs" ) func main() { fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8)) // ... }
其中vfs.OS("/path")基于本地文件系統(tǒng)構造一個虛擬的文件系統(tǒng),然后gatefs.New基于現有的虛擬文件系統(tǒng)構造一個并發(fā)受控的虛擬文件系統(tǒng)。并發(fā)數控制的原理在1.5節(jié)已經講過,就是通過帶緩存通道的發(fā)送和接收規(guī)則來實現最大并發(fā)阻塞:
var limit = make(chan int, 3) func main() { for _, w := range work { go func() { limit <- 1 w() <-limit }() } select{} }
不過gatefs對此做一個抽象類型gate,增加了enter()和leave()方法分別對應并發(fā)代碼的進入和離開。當超出并發(fā)數目限制的時候,enter()方法會阻塞直到并發(fā)數降下來為止。
type gate chan bool func (g gate) enter() { g <- true } func (g gate) leave() { <-g }
gatefs包裝的新的虛擬文件系統(tǒng)就是將需要控制并發(fā)的方法增加了對enter()和leave()的調用而已:
type gatefs struct { fs vfs.FileSystem gate } func (fs gatefs) Lstat(p string) (os.FileInfo, error) { fs.enter() defer fs.leave() return fs.fs.Lstat(p) }
我們不僅可以控制最大的并發(fā)數目,而且可以通過帶緩存通道的使用量和最大容量比例來判斷程序運行的并發(fā)率。當通道為空時可以認為是空閑狀態(tài),當通道滿了時可以認為是繁忙狀態(tài),這對于后臺一些低級任務的運行是有參考價值的。
1.6.5 贏者為王
采用并發(fā)編程的動機有很多:并發(fā)編程可以簡化問題,例如一類問題對應一個處理線程會更簡單;并發(fā)編程還可以提升性能,在一個多核CPU上開兩個線程一般會比開一個線程快一些。其實對提升性能而言,并不是程序運行速度快就表示用戶體驗好,很多時候程序能快速響應用戶請求才是最重要的,當沒有用戶請求需要處理的時候才合適處理一些低優(yōu)先級的后臺任務。
假設我們想快速地搜索“golang”相關的主題,我們可能會同時打開必應、谷歌或百度等多個檢索引擎。當某個搜索最先返回結果后,就可以關閉其他搜索頁面了。因為受網絡環(huán)境和搜索引擎算法的影響,某些搜索引擎可能很快返回搜索結果,某些搜索引擎也可能等到他們公司倒閉也沒有完成搜索。我們可以采用類似的策略來編寫這個程序:
func main() { ch := make(chan string, 32) go func() { ch <- searchByBing("golang") }() go func() { ch <- searchBygoogle("golang") }() go func() { ch <- searchByBaidu("golang") }() fmt.Println(<-ch) }
首先,創(chuàng)建了一個帶緩存通道,通道的緩存數目要足夠大,保證不會因為緩存的容量引起不必要的阻塞。然后開啟了多個后臺線程,分別向不同的搜索引擎提交搜索請求。當任意一個搜索引擎最先有結果之后,都會馬上將結果發(fā)到通道中(因為通道帶了足夠的緩存,這個過程不會阻塞)。但是最終只從通道取第一個結果,也就是最先返回的結果。
通過適當開啟一些冗余的線程,嘗試用不同途徑去解決同樣的問題,最終以贏者為王的方式提升了程序的相應性能。
1.6.6 素數篩
在1.2節(jié)中,為了演示Newsqueak的并發(fā)特性,給出了并發(fā)版本素數篩的實現。并發(fā)版本的素數篩是一個經典的并發(fā)例子,通過它可以更深刻地理解Go語言的并發(fā)特性。“素數篩”的原理如圖1-5所示。
我們需要先生成最初的2, 3, 4,…自然數序列(不包含開頭的0、1):
// 返回生成自然數序列的通道: 2, 3, 4, ... func GenerateNatural() chan int { ch := make(chan int) go func() { for i := 2; ; i++ { ch <- i } }() return ch }
GenerateNatural()函數內部啟動一個Goroutine生產序列,返回對應的通道。
然后為每個素數構造一個篩子:將輸入序列中是素數倍數的數提出,并返回新的序列,是一個新的通道。
// 通道過濾器: 刪除能被素數整除的數 func PrimeFilter(in <-chan int, prime int) chan int { out := make(chan int) go func() { for { if i := <-in; i%prime != 0 { out <- i } } }() return out }
PrimeFilter()函數也是內部啟動一個Goroutine生產序列,返回過濾后序列對應的通道。
現在可以在main()函數中驅動這個并發(fā)的素數篩了:
func main() { ch := GenerateNatural() // 自然數序列: 2, 3, 4, ... for i := 0; i < 100; i++ { prime := <-ch // 新出現的素數 fmt.Printf("%v: %vn", i+1, prime) ch = PrimeFilter(ch, prime) // 基于新素數構造的過濾器 } }
先是調用GenerateNatural()生成最原始的從2開始的自然數序列。然后開始一個100次迭代的循環(huán),希望生成100個素數。在每次循環(huán)迭代開始的時候,通道中的第一個數必定是素數,我們先讀取并打印這個素數。然后基于通道中剩余的數列,并以當前取出的素數為篩子過濾后面的素數。不同的素數篩對應的通道是串聯在一起的。
素數篩展示了一種優(yōu)雅的并發(fā)程序結構。但是因為每個并發(fā)體處理的任務粒度太細微,程序整體的性能并不理想。對于細粒度的并發(fā)程序,CSP模型中固有的消息傳遞的代價太高了(多線程并發(fā)模型同樣要面臨線程啟動的代價)。
1.6.7 并發(fā)的安全退出
有時候需要通知Goroutine停止它正在干的事情,特別是當它工作在錯誤的方向上的時候。Go語言并沒有提供一個直接終止Goroutine的方法,因為這樣會導致Goroutine之間的共享變量處在未定義的狀態(tài)上。但是如果想要退出兩個或者任意多個Goroutine怎么辦呢?
Go語言中不同Goroutine之間主要依靠通道進行通信和同步。要同時處理多個通道的發(fā)送或接收操作,需要使用select關鍵字(這個關鍵字和網絡編程中的select()函數的行為類似)。當select()有多個分支時,會隨機選擇一個可用的通道分支,如果沒有可用的通道分支,則選擇default分支,否則會一直保持阻塞狀態(tài)。
基于select()實現的通道的超時判斷:
select { case v := <-in: fmt.Println(v) case <-time.After(time.Second): return // 超時 }
通過select的default分支實現非阻塞的通道發(fā)送或接收操作:
select { case v := <-in: fmt.Println(v) default: // 沒有數據 }
通過select來阻止main()函數退出:
func main() { // 做一些處理 select{} }
當有多個通道均可操作時,select會隨機選擇一個通道。基于該特性我們可以用select實現一個生成隨機數序列的程序:
func main() { ch := make(chan int) go func() { for { select { case ch <- 0: case ch <- 1: } } }() for v := range ch { fmt.Println(v) } }
我們通過select和default分支可以很容易實現一個Goroutine的退出控制:
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } } } func main() { cannel := make(chan bool) go worker(cannel) time.Sleep(time.Second) cannel <- true }
但是通道的發(fā)送操作和接收操作是一一對應的,如果要停止多個Goroutine,那么可能需要創(chuàng)建同樣數量的通道,這個代價太大了。其實我們可以通過close()關閉一個通道來實現廣播的效果,所有從關閉通道接收的操作均會收到一個零值和一個可選的失敗標志。
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } } } func main() { cancel := make(chan bool) for i := 0; i < 10; i++ { go worker(cancel) } time.Sleep(time.Second) close(cancel) }
我們通過close()來關閉cancel通道,向多個Goroutine廣播退出的指令。不過這個程序依然不夠穩(wěn)健:當每個Goroutine收到退出指令退出時一般會進行一定的清理工作,但是退出的清理工作并不能保證被完成,因為main線程并沒有等待各個工作Goroutine退出工作完成的機制。我們可以結合sync.WaitGroup來改進:
func worker(wg *sync.WaitGroup, cannel chan bool) { defer wg.Done() for { select { default: fmt.Println("hello") case <-cannel: return } } } func main() { cancel := make(chan bool) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go worker(&wg, cancel) } time.Sleep(time.Second) close(cancel) wg.Wait() }
現在每個工作者并發(fā)體的創(chuàng)建、運行、暫停和退出都是在main()函數的安全控制之下了。
1.6.8 context包
在Go 1.7發(fā)布時,標準庫增加了一個context包,用來簡化對于處理單個請求的多個Goroutine之間與請求域的數據、超時和退出等操作,官方有博客文章對此做了專門介紹。我們可以用context包來重新實現前面的線程安全退出或超時的控制:
func worker(ctx context.Context, wg *sync.WaitGroup) error { defer wg.Done() for { select { default: fmt.Println("hello") case <-ctx.Done(): return ctx.Err() } } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go worker(ctx, &wg) } time.Sleep(time.Second) cancel() wg.Wait() }
當并發(fā)體超時或main主動停止工作者Goroutine時,每個工作者都可以安全退出。
Go語言是帶內存自動回收特性的,因此內存一般不會泄漏。在前面素數篩的例子中,GenerateNatural和PrimeFilter()函數內部都啟動了新的Goroutine,當main()函數不再使用通道時,后臺Goroutine有泄漏的風險。我們可以通過context包來避免這個問題,下面是改進的素數篩實現:
// 返回生成自然數序列的通道: 2, 3, 4, ... func GenerateNatural(ctx context.Context) chan int { ch := make(chan int) go func() { for i := 2; ; i++ { select { case <- ctx.Done(): return case ch <- i: } } }() return ch } // 通道過濾器:刪除能被素數整除的數 func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int { out := make(chan int) go func() { for { if i := <-in; i%prime != 0 { select { case <- ctx.Done(): return case out <- i: } } } }() return out } func main() { // 通過Context控制后臺Goroutine狀態(tài) ctx, cancel := context.WithCancel(context.Background()) ch := GenerateNatural(ctx) // 自然數序列:2, 3, 4, ... for i := 0; i < 100; i++ { prime := <-ch // 新出現的素數 fmt.Printf("%v: %vn", i+1, prime) ch = PrimeFilter(ctx, ch, prime) // 基于新素數構造的過濾器 } cancel() }
當main()函數完成工作前,通過調用cancel()來通知后臺Goroutine退出,這樣就避免了Goroutine的泄漏。
并發(fā)是一個非常大的主題,這里只展示幾個非常基礎的并發(fā)編程的例子。官方文檔也有很多關于并發(fā)編程的討論,國內也有專門討論Go語言并發(fā)編程的書籍。讀者可以根據自己的需求查閱相關的文獻。
本文截選自《Go語言高級編程》