Golang 中跟蹤第三個(gè) Goroutine 中兩個(gè) Goroutine 的完成狀態(tài)的最佳實(shí)踐是什么?
在 Golang 中,要跟蹤兩個(gè) Goroutine 的完成狀態(tài)并在第三個(gè) Goroutine 中處理它們的結(jié)果,最佳實(shí)踐是使用 sync 包中的 WaitGroup。WaitGroup 允許我們?cè)谥?Goroutine 中等待其他 Goroutine 的完成。首先,我們需要?jiǎng)?chuàng)建一個(gè) WaitGroup 對(duì)象,并在主 Goroutine 中調(diào)用 Add 方法來(lái)設(shè)置等待的 Goroutine 數(shù)量。然后,在每個(gè) Goroutine 的末尾調(diào)用 Done 方法來(lái)表示該 Goroutine 的完成。最后,在第三個(gè) Goroutine 中調(diào)用 Wait 方法來(lái)等待所有 Goroutine 的完成。這樣,我們就可以安全地跟蹤并處理兩個(gè) Goroutine 的結(jié)果了。這是 Golang 中跟蹤多個(gè) Goroutine 完成狀態(tài)的最佳實(shí)踐。
問(wèn)題內(nèi)容
我有三個(gè)并發(fā)運(yùn)行的 goroutine。其中兩個(gè)進(jìn)行一些處理并將其結(jié)果發(fā)送到結(jié)果通道。第三個(gè) goroutine 通過(guò)讀取結(jié)果通道來(lái)“統(tǒng)計(jì)”結(jié)果。我可以使用 waitgroup 等待兩個(gè)計(jì)算 goroutine 完成,然后遍歷結(jié)果通道來(lái)統(tǒng)計(jì)結(jié)果,但這無(wú)法擴(kuò)展,并且需要我創(chuàng)建一個(gè)具有巨大緩沖區(qū)大小的緩沖結(jié)果通道,這是不可接受的在生產(chǎn)代碼中。
我想在處理發(fā)生時(shí)統(tǒng)計(jì)結(jié)果,但在所有統(tǒng)計(jì)完成之前我不想退出程序。在 Go 中實(shí)現(xiàn)這一目標(biāo)的最佳實(shí)踐是什么?
這是我目前的方法,效果很好。我想知道是否有更好的方法,因?yàn)檫@看起來(lái)有點(diǎn)笨拙?
package main import ( "fmt" "sync" ) type T struct{} func main() { var widgetInventory int = 1000 transactions := make(chan int, 100) salesDone := make(chan T) purchasesDone := make(chan T) var wg sync.WaitGroup fmt.Println("Starting inventory count = ", widgetInventory) go makeSales(transactions, salesDone) go newPurchases(transactions, purchasesDone) wg.Add(1) go func() { salesAreDone := false purchasesAreDone := false for { select { case transaction := <-transactions: widgetInventory += transaction case <-salesDone: salesAreDone = true case <-purchasesDone: purchasesAreDone = true default: if salesAreDone && purchasesAreDone { wg.Done() return } } } }() wg.Wait() fmt.Println("Ending inventory count = ", widgetInventory) } func makeSales(transactions chan int, salesDone chan T) { for i := 0; i < 3000; i++ { transactions <- -100 } salesDone <- struct{}{} } func newPurchases(transactions chan int, purchasesDone chan T) { for i := 0; i < 3000; i++ { transactions <- 100 } purchasesDone <- struct{}{} }
登錄后復(fù)制
解決方法
不適合任何合理的定義很好。您在這里有一個(gè)熱門的 for
循環(huán):
for { select { case transaction := <-transactions: widgetInventory += transaction case <-salesDone: salesAreDone = true case <-purchasesDone: purchasesAreDone = true default: if salesAreDone && purchasesAreDone { wg.Done() return } } }
登錄后復(fù)制
只要沒(méi)有通道可供讀取,就會(huì)執(zhí)行 default
案例。由于渠道的工作方式,這種情況經(jīng)常發(fā)生。
這個(gè)稍作調(diào)整的代碼版本說(shuō)明了此循環(huán)的“熱度”。確切的結(jié)果會(huì)有所不同,可能會(huì)相當(dāng)高。
Default case ran 27305 times
登錄后復(fù)制
當(dāng) select
ing 來(lái)自通道時(shí),您不希望出現(xiàn) default
情況,除非該默認(rèn)情況也會(huì)阻止其中的某些內(nèi)容。否則你會(huì)得到這樣的熱循環(huán)。
更好的方法:使用 nil
able 通道進(jìn)行選擇
通常在選擇中,您想要識(shí)別關(guān)閉的通道并將通道變量設(shè)置為 nil
; select
永遠(yuǎn)不會(huì)成功地從 nil
通道讀取內(nèi)容,因此這實(shí)際上“禁用”了該選擇。
考慮代碼的此修改版本:
go func(transactions chan int, salesDone <-chan T, purchasesDone <-chan T) { defer wg.Done() for transactions != nil { select { case transaction, ok := <-transactions: if ok { widgetInventory += transaction } else { transactions = nil } case <-salesDone: salesDone = nil if purchasesDone == nil { close(transactions) } case <-purchasesDone: purchasesDone = nil if salesDone == nil { close(transactions) } } } }(transactions, salesDone, purchasesDone)
登錄后復(fù)制
通過(guò)對(duì)消費(fèi)者的這些調(diào)整,我們不再有熱循環(huán);我們總是阻塞直到從通道讀取數(shù)據(jù)。一旦 salesDone
和 purchasesDone
都被“發(fā)出信號(hào)”,我們 close(transactions)
。一旦我們耗盡 transactions
并且它被關(guān)閉,我們將 transactions
設(shè)置為 nil。我們?cè)?transactions
不為 nil 時(shí)循環(huán),在這段代碼中,意味著所有通道都是 nil
。
微妙但重要的一點(diǎn):我將通道傳遞給此函數(shù),因此它的引用不與 main
共享范圍。否則,將 transactions
設(shè)置為 nil
將寫入一個(gè)在 goroutine 之間共享的變量。然而在這種情況下,無(wú)論如何,這并不重要,因?yàn)槲覀儭爸馈蔽覀兪亲詈笠粋€(gè)從 transactions
讀取的內(nèi)容。
更簡(jiǎn)單的選項(xiàng):多個(gè)等待組
如果您考慮一下您在這里所做的事情,您需要等到兩個(gè)生產(chǎn)者都完成對(duì) transactions
的生產(chǎn)。然后你想排空 transactions
。一旦通道關(guān)閉并排空,main
就知道求和已完成。
您不需要 select
來(lái)執(zhí)行此操作。而 select
為每個(gè)“工人”都有一個(gè)案例,可以說(shuō)是相當(dāng)不優(yōu)雅的;您必須對(duì)多個(gè)工作人員進(jìn)行硬編碼并單獨(dú)處理“完成”通道。
您需要做的是:
除了為生產(chǎn)者使用一個(gè) var resultswgsync.WaitGroup
之外,還為消費(fèi)者添加一個(gè)。
生產(chǎn)者 defer wg.Done()
消費(fèi)者 defer resultswg.Done()
在遍歷 transactions
之前:
go func() { defer resultswg.Done() for transaction := range transactions { widgetInventory += transaction } }()
登錄后復(fù)制
main 處理等待生產(chǎn)者、關(guān)閉事務(wù)以結(jié)束范圍,然后等待消費(fèi)者:
wg.Wait() close(transactions) resultswg.Wait()
登錄后復(fù)制
以這種方式編碼,最終會(huì)變得簡(jiǎn)短而甜蜜:
package main import ( "fmt" "sync" ) func main() { var widgetInventory int = 1000 transactions := make(chan int, 100) var wg, resultswg sync.WaitGroup fmt.Println("Starting inventory count = ", widgetInventory) wg.Add(2) go makeSales(transactions, &wg) go newPurchases(transactions, &wg) resultswg.Add(1) go func() { defer resultswg.Done() for transaction := range transactions { widgetInventory += transaction } }() wg.Wait() close(transactions) resultswg.Wait() fmt.Println("Ending inventory count = ", widgetInventory) } func makeSales(transactions chan int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 3000; i++ { transactions <- -100 } } func newPurchases(transactions chan int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 3000; i++ { transactions <- 100 } }
登錄后復(fù)制
您可以在這里看到,在此模式中可以有任意數(shù)量的生產(chǎn)者;您只需為每個(gè)生產(chǎn)者添加 wg.Add(1)
即可。
當(dāng)我不知道每個(gè)工作人員會(huì)返回多少結(jié)果時(shí),我一直使用這種模式來(lái)并行化工作。我發(fā)現(xiàn)它很容易理解,并且比嘗試 select
多個(gè)通道簡(jiǎn)單得多。事實(shí)上,我什至想說(shuō),如果您發(fā)現(xiàn)自己從多個(gè)渠道進(jìn)行 select
ing,您應(yīng)該退后一步,確保它對(duì)您來(lái)說(shuō)確實(shí)有意義。我使用 select
的頻率遠(yuǎn)遠(yuǎn)低于使用等待組的頻率。