日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

今天我們學(xué)習(xí)下 Queue 的進(jìn)階用法。

生產(chǎn)者消費(fèi)者模型

在并發(fā)編程中,比如爬蟲(chóng),有的線程負(fù)責(zé)爬取數(shù)據(jù),有的線程負(fù)責(zé)對(duì)爬取到的數(shù)據(jù)做處理(清洗、分類(lèi)和入庫(kù))。假如他們是直接交互的,那么當(dāng)二者的速度不匹配時(shí)勢(shì)必出現(xiàn)等待現(xiàn)象,這也就產(chǎn)生了資源的浪費(fèi)。

抽象是一種很重要的通用能力,而生產(chǎn)者消費(fèi)者模型是前人將一系列同類(lèi)型的具體的問(wèn)題抽象出來(lái)的一個(gè)一致的最佳解決方案。

該模型有三個(gè)重要角色,容器,生產(chǎn)者和消費(fèi)者,顧名思義,生產(chǎn)者就是負(fù)責(zé)生產(chǎn)數(shù)據(jù)或任務(wù)的,消費(fèi)者就是負(fù)責(zé)消費(fèi)數(shù)據(jù)或者任務(wù)的(下文統(tǒng)稱(chēng)為任務(wù)),容器是二者進(jìn)行通訊的媒介。在該模型中,生產(chǎn)者和消費(fèi)者不在直接進(jìn)行通訊,而是通過(guò)引入一個(gè)第三者容器(通常都是用阻塞隊(duì)列)來(lái)達(dá)到解耦的目的。這樣生產(chǎn)者不必在因?yàn)橄M(fèi)者速度過(guò)慢而等待,直接將任務(wù)放入容器即可,消費(fèi)者也不必因生產(chǎn)者生產(chǎn)速度過(guò)慢而等待,直接從容器中獲取任務(wù),以此達(dá)到了資源的最大利用。

使用該模型可以解決并發(fā)編程中的絕大部分并發(fā)問(wèn)題。

簡(jiǎn)易版

我們先寫(xiě)一個(gè)單生產(chǎn)者和單消費(fèi)者的簡(jiǎn)易版生產(chǎn)者消費(fèi)者模型。

import threading
import time
import queue

def consume(thread_name, q):
    while True:
        time.sleep(2)
        product = q.get()
        print("%s consume %s" % (thread_name, product))

def produce(thread_name, q):
    for i in range(3):
        product = 'product-' + str(i)
        q.put(product)
        print("%s produce %s" % (thread_name, product))
        time.sleep(1)
    
            
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))

p.start()
c.start()

p.join()

# 輸出如下
producer produce product-0
producer produce product-1
consumer consume product-0
producer produce product-2
consumer consume product-1
consumer consume product-2
...

以上就是最簡(jiǎn)單的生產(chǎn)者消費(fèi)者模型了,生產(chǎn)者生產(chǎn)三個(gè)任務(wù)供消費(fèi)者消費(fèi)。但是上面的寫(xiě)法有個(gè)問(wèn)題,就是生產(chǎn)者將任務(wù)生產(chǎn)完畢之后就和主線程一起退出了,但是消費(fèi)者將所有的任務(wù)消費(fèi)完之后還沒(méi)停止,一直處于阻塞狀態(tài)。

那可不可以將 while True 的判斷改為 while not q.empty()呢,肯定是不行的。因?yàn)?empty() 返回 False ,不保證后續(xù)調(diào)用的 get()不被阻塞。同時(shí),如果用 empty() 函數(shù)來(lái)做判斷的話,那么就要保證消費(fèi)者線程開(kāi)啟之時(shí)生產(chǎn)者一定至少生產(chǎn)了一個(gè)任務(wù),否則消費(fèi)者線程就會(huì)因條件不滿(mǎn)足直接退出程序;同時(shí)如果生產(chǎn)者生產(chǎn)速度比較慢,一旦消費(fèi)者將任務(wù)消費(fèi)完且下次判斷時(shí)還沒(méi)有新的任務(wù)入隊(duì),那么消費(fèi)者線程也會(huì)因條件不滿(mǎn)足直接退出程序。自此以后,生產(chǎn)者生產(chǎn)的任務(wù)就永遠(yuǎn)不會(huì)被消費(fèi)了。

那我們可以做一個(gè)約定,當(dāng)生產(chǎn)者生產(chǎn)完任務(wù)之后,放入一個(gè)標(biāo)志,類(lèi)似于 q.put(None),一旦消費(fèi)者接收到為 None 的任務(wù)時(shí)就意味著結(jié)束,直接退出程序即可。這種做法在上面的程序中是沒(méi)有問(wèn)題的,唯一的缺點(diǎn)就是有 N 個(gè)消費(fèi)者線程就需要放入 N 個(gè) None 標(biāo)志,這對(duì)于多消費(fèi)者類(lèi)型的程序顯然是很不友好的。

最佳實(shí)踐

我們可以結(jié)合隊(duì)列的內(nèi)置函數(shù) task_done() 和 join() 來(lái)達(dá)到我們的目的。

join() 函數(shù)是阻塞的。當(dāng)消費(fèi)者通過(guò) get() 從隊(duì)列獲取一項(xiàng)任務(wù)并處理完成之后,需要調(diào)用且只可以調(diào)用一次 task_done(),該方法會(huì)給隊(duì)列發(fā)送一個(gè)信號(hào),join()函數(shù)則在監(jiān)聽(tīng)這個(gè)信號(hào)。可以簡(jiǎn)單理解為隊(duì)列內(nèi)部維護(hù)了一個(gè)計(jì)數(shù)器,該計(jì)數(shù)器標(biāo)識(shí)未完成的任務(wù)數(shù),每當(dāng)添加任務(wù)時(shí),計(jì)數(shù)器會(huì)增加,調(diào)用 task_done()時(shí)計(jì)數(shù)器則會(huì)減少,直到隊(duì)列為空。而 join() 就是在監(jiān)聽(tīng)隊(duì)列是否為空,一旦條件滿(mǎn)足則結(jié)束阻塞狀態(tài)。

import threading
import time
import queue

def consume(thread_name, q):
    while True:
        time.sleep(2)
        product = q.get()
        print("%s consume %s" % (thread_name, product))
        q.task_done()

def produce(thread_name, q):
    for i in range(3):
        product = 'product-' + str(i)
        q.put(product)
        print("%s produce %s" % (thread_name, product))
        time.sleep(1)
    q.join()
            
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))
c1 = threading.Thread(target=consume, args=("consumer-1",q))

c.setDaemon(True)
c1.setDaemon(True)
p.start()
c.start()
c1.start()

p.join()

# 輸出如下
producer produce product-0
producer produce product-1
consumer-1 consume product-0
consumer consume product-1
producer produce product-2
consumer consume product-2

上述示例中,我們將消費(fèi)者線程設(shè)置為守護(hù)線程,這樣當(dāng)主線程結(jié)束時(shí)消費(fèi)者線程也會(huì)一并結(jié)束。然后主線程最后一句 p.join() 又表示主線程必須等待生產(chǎn)者線程結(jié)束后才可以結(jié)束。

再細(xì)看生產(chǎn)者線程的主函數(shù) produce(),該函數(shù)中出現(xiàn)了我們上面說(shuō)過(guò)的 q.join() 函數(shù)。而 task_done 則是在消費(fèi)者線程的主函數(shù)中調(diào)用的。故當(dāng)生產(chǎn)者線程生產(chǎn)完所有任務(wù)后就會(huì)被阻塞,只有當(dāng)消費(fèi)者線程處理完所有任務(wù)后生產(chǎn)者才會(huì)阻塞結(jié)束。隨著生產(chǎn)者線程的結(jié)束,主線程也一并結(jié)束,守護(hù)線程消費(fèi)者線程也一并結(jié)束,自此所有線程均安全退出。

Queue 總結(jié)

本章節(jié)介紹了隊(duì)列的高級(jí)應(yīng)用,從簡(jiǎn)易版的示例到最佳實(shí)踐,介紹了生產(chǎn)者消費(fèi)者模型的基本用法,在該模型中,隊(duì)列扮演了非常重要的角色,起到了解耦的目的。

本模型有固定的步驟,其中最重要的就是通過(guò) task_done() 和 join() 來(lái)互相通信。 task_done() 僅僅用來(lái)通知隊(duì)列消費(fèi)者已完成一個(gè)任務(wù),至于任務(wù)是什么它毫不關(guān)心,它只關(guān)心隊(duì)列中未完成的任務(wù)數(shù)量。

注意:task_done() 不可以在 put() 之前調(diào)用,否則會(huì)引發(fā) ValueError: task_done() called too many times。同時(shí)在處理完任務(wù)后只可以調(diào)用一次該函數(shù),否則隊(duì)列將不能準(zhǔn)確計(jì)算未完成任務(wù)數(shù)量。

分享到:
標(biāo)簽:Python Queue
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定