今天我們學(xué)習(xí)下 Queue 的進(jìn)階用法。
生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中,比如爬蟲(chóng),有的線程負(fù)責(zé)爬取數(shù)據(jù),有的線程負(fù)責(zé)對(duì)爬取到的數(shù)據(jù)做處理(清洗、分類(lèi)和入庫(kù))。假如他們是直接交互的,那么當(dāng)二者的速度不匹配時(shí)勢(shì)必出現(xiàn)等待現(xiàn)象,這也就產(chǎn)生了資源的浪費(fèi)。
抽象是一種很重要的通用能力,而生產(chǎn)者消費(fèi)者模型是前人將一系列同類(lèi)型的具體的問(wèn)題抽象出來(lái)的一個(gè)一致的最佳解決方案。
該模型有三個(gè)重要角色,容器,生產(chǎn)者和消費(fèi)者,顧名思義,生產(chǎn)者就是負(fù)責(zé)生產(chǎn)數(shù)據(jù)或任務(wù)的,消費(fèi)者就是負(fù)責(zé)消費(fèi)數(shù)據(jù)或者任務(wù)的(下文統(tǒng)稱(chēng)為任務(wù)),容器是二者進(jìn)行通訊的媒介。在該模型中,生產(chǎn)者和消費(fèi)者不在直接進(jìn)行通訊,而是通過(guò)引入一個(gè)第三者容器(通常都是用阻塞隊(duì)列)來(lái)達(dá)到解耦的目的。這樣生產(chǎn)者不必在因?yàn)橄M(fèi)者速度過(guò)慢而等待,直接將任務(wù)放入容器即可,消費(fèi)者也不必因生產(chǎn)者生產(chǎn)速度過(guò)慢而等待,直接從容器中獲取任務(wù),以此達(dá)到了資源的最大利用。
使用該模型可以解決并發(fā)編程中的絕大部分并發(fā)問(wèn)題。
簡(jiǎn)易版
我們先寫(xiě)一個(gè)單生產(chǎn)者和單消費(fèi)者的簡(jiǎn)易版生產(chǎn)者消費(fèi)者模型。
import threading
import time
import queue
def consume(thread_name, q):
while True:
time.sleep(2)
product = q.get()
print("%s consume %s" % (thread_name, product))
def produce(thread_name, q):
for i in range(3):
product = 'product-' + str(i)
q.put(product)
print("%s produce %s" % (thread_name, product))
time.sleep(1)
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))
p.start()
c.start()
p.join()
# 輸出如下
producer produce product-0
producer produce product-1
consumer consume product-0
producer produce product-2
consumer consume product-1
consumer consume product-2
...
以上就是最簡(jiǎn)單的生產(chǎn)者消費(fèi)者模型了,生產(chǎn)者生產(chǎn)三個(gè)任務(wù)供消費(fèi)者消費(fèi)。但是上面的寫(xiě)法有個(gè)問(wèn)題,就是生產(chǎn)者將任務(wù)生產(chǎn)完畢之后就和主線程一起退出了,但是消費(fèi)者將所有的任務(wù)消費(fèi)完之后還沒(méi)停止,一直處于阻塞狀態(tài)。
那可不可以將 while True 的判斷改為 while not q.empty()呢,肯定是不行的。因?yàn)?empty() 返回 False ,不保證后續(xù)調(diào)用的 get()不被阻塞。同時(shí),如果用 empty() 函數(shù)來(lái)做判斷的話,那么就要保證消費(fèi)者線程開(kāi)啟之時(shí)生產(chǎn)者一定至少生產(chǎn)了一個(gè)任務(wù),否則消費(fèi)者線程就會(huì)因條件不滿(mǎn)足直接退出程序;同時(shí)如果生產(chǎn)者生產(chǎn)速度比較慢,一旦消費(fèi)者將任務(wù)消費(fèi)完且下次判斷時(shí)還沒(méi)有新的任務(wù)入隊(duì),那么消費(fèi)者線程也會(huì)因條件不滿(mǎn)足直接退出程序。自此以后,生產(chǎn)者生產(chǎn)的任務(wù)就永遠(yuǎn)不會(huì)被消費(fèi)了。
那我們可以做一個(gè)約定,當(dāng)生產(chǎn)者生產(chǎn)完任務(wù)之后,放入一個(gè)標(biāo)志,類(lèi)似于 q.put(None),一旦消費(fèi)者接收到為 None 的任務(wù)時(shí)就意味著結(jié)束,直接退出程序即可。這種做法在上面的程序中是沒(méi)有問(wèn)題的,唯一的缺點(diǎn)就是有 N 個(gè)消費(fèi)者線程就需要放入 N 個(gè) None 標(biāo)志,這對(duì)于多消費(fèi)者類(lèi)型的程序顯然是很不友好的。
最佳實(shí)踐
我們可以結(jié)合隊(duì)列的內(nèi)置函數(shù) task_done() 和 join() 來(lái)達(dá)到我們的目的。
join() 函數(shù)是阻塞的。當(dāng)消費(fèi)者通過(guò) get() 從隊(duì)列獲取一項(xiàng)任務(wù)并處理完成之后,需要調(diào)用且只可以調(diào)用一次 task_done(),該方法會(huì)給隊(duì)列發(fā)送一個(gè)信號(hào),join()函數(shù)則在監(jiān)聽(tīng)這個(gè)信號(hào)。可以簡(jiǎn)單理解為隊(duì)列內(nèi)部維護(hù)了一個(gè)計(jì)數(shù)器,該計(jì)數(shù)器標(biāo)識(shí)未完成的任務(wù)數(shù),每當(dāng)添加任務(wù)時(shí),計(jì)數(shù)器會(huì)增加,調(diào)用 task_done()時(shí)計(jì)數(shù)器則會(huì)減少,直到隊(duì)列為空。而 join() 就是在監(jiān)聽(tīng)隊(duì)列是否為空,一旦條件滿(mǎn)足則結(jié)束阻塞狀態(tài)。
import threading
import time
import queue
def consume(thread_name, q):
while True:
time.sleep(2)
product = q.get()
print("%s consume %s" % (thread_name, product))
q.task_done()
def produce(thread_name, q):
for i in range(3):
product = 'product-' + str(i)
q.put(product)
print("%s produce %s" % (thread_name, product))
time.sleep(1)
q.join()
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))
c1 = threading.Thread(target=consume, args=("consumer-1",q))
c.setDaemon(True)
c1.setDaemon(True)
p.start()
c.start()
c1.start()
p.join()
# 輸出如下
producer produce product-0
producer produce product-1
consumer-1 consume product-0
consumer consume product-1
producer produce product-2
consumer consume product-2
上述示例中,我們將消費(fèi)者線程設(shè)置為守護(hù)線程,這樣當(dāng)主線程結(jié)束時(shí)消費(fèi)者線程也會(huì)一并結(jié)束。然后主線程最后一句 p.join() 又表示主線程必須等待生產(chǎn)者線程結(jié)束后才可以結(jié)束。
再細(xì)看生產(chǎn)者線程的主函數(shù) produce(),該函數(shù)中出現(xiàn)了我們上面說(shuō)過(guò)的 q.join() 函數(shù)。而 task_done 則是在消費(fèi)者線程的主函數(shù)中調(diào)用的。故當(dāng)生產(chǎn)者線程生產(chǎn)完所有任務(wù)后就會(huì)被阻塞,只有當(dāng)消費(fèi)者線程處理完所有任務(wù)后生產(chǎn)者才會(huì)阻塞結(jié)束。隨著生產(chǎn)者線程的結(jié)束,主線程也一并結(jié)束,守護(hù)線程消費(fèi)者線程也一并結(jié)束,自此所有線程均安全退出。
Queue 總結(jié)
本章節(jié)介紹了隊(duì)列的高級(jí)應(yīng)用,從簡(jiǎn)易版的示例到最佳實(shí)踐,介紹了生產(chǎn)者消費(fèi)者模型的基本用法,在該模型中,隊(duì)列扮演了非常重要的角色,起到了解耦的目的。
本模型有固定的步驟,其中最重要的就是通過(guò) task_done() 和 join() 來(lái)互相通信。 task_done() 僅僅用來(lái)通知隊(duì)列消費(fèi)者已完成一個(gè)任務(wù),至于任務(wù)是什么它毫不關(guān)心,它只關(guān)心隊(duì)列中未完成的任務(wù)數(shù)量。
注意:task_done() 不可以在 put() 之前調(diào)用,否則會(huì)引發(fā) ValueError: task_done() called too many times。同時(shí)在處理完任務(wù)后只可以調(diào)用一次該函數(shù),否則隊(duì)列將不能準(zhǔn)確計(jì)算未完成任務(wù)數(shù)量。