之前我們了解了如何創建多個任務來并發運行程序,方式是通過 asyncio.create_task 將協程包裝成任務,如下所示:
import asyncio, time
async def mAIn():
task1 = asyncio.create_task(asyncio.sleep(3))
task2 = asyncio.create_task(asyncio.sleep(3))
task3 = asyncio.create_task(asyncio.sleep(3))
await task1
await task2
await task3
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
總耗時: 3.003109625
"""
但這種代碼編寫方式只適用于簡單情況,如果在同時發出數百、數千甚至更多 Web 請求的情況下,這種編寫方式將變得冗長且混亂。所以 asyncio 提供了許多便利的函數,支持我們一次性等待多個任務。
等待一組任務全部完成
一個被廣泛用于等待一組任務的方式是使用 asyncio.gather,這個函數接收一系列的可等待對象,允許我們在一行代碼中同時運行它們。如果傳入的 awaitable 對象是協程,gather 函數會自動將其包裝成任務,以確保它們可以同時運行。這意味著不必像之前那樣,用 asyncio.create_task 單獨包裝,但即便如此,還是建議手動包裝一下。
asyncio.gather 同樣返回一個 awaitable 對象,在 await 表達式中使用它時,它將暫停,直到傳遞給它的所有 awaitable 對象都完成為止。一旦所有任務都完成,asyncio.gather 將返回這些任務的結果所組成的列表。
import asyncio
import time
from aiohttp import ClientSession
async def fetch_status(session: ClientSession, url: str):
async with session.get(url) as resp:
return resp.status
async def main():
async with ClientSession() as session:
# 注意:requests 里面是 100 個協程
# 傳遞給 asyncio.gather 之后會自動被包裝成任務
requests = [fetch_status(session, "http://www.baidu.com")
for _ in range(100)]
# 并發運行 100 個任務,并等待這些任務全部完成
# 相比寫 for 循環再單獨 await,這種方式就簡便多了
status_codes = await asyncio.gather(*requests)
print(f"{len(status_codes)} 個任務已全部完成")
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
100 個任務已全部完成
總耗時: 0.552532458
"""
完成 100 個請求只需要 0.55 秒鐘,由于網絡問題,測試的結果可能不準確,但異步肯定比同步要快。
另外傳給 gather 的每個 awaitable 對象可能不是按照確定性順序完成的,例如將協程 a 和 b 按順序傳遞給 gather,但 b 可能會在 a 之前完成。不過 gather 的一個很好的特性是,不管 awaitable 對象何時完成,都保證結果會按照傳遞它們的順序返回。
import asyncio
import time
async def main():
# asyncio.sleep 還可以接收一個 result 參數,作為 await 表達式的值
tasks = [asyncio.sleep(second, result=f"我睡了 {second} 秒")
for second in (5, 3, 4)]
print(await asyncio.gather(*tasks))
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒']
總耗時: 5.002968417
"""
然后 gather 還可以實現分組,什么意思呢?
import asyncio
import time
async def main():
gather1 = asyncio.gather(
*[asyncio.sleep(second, result=f"我睡了 {second} 秒")
for second in (5, 3, 4)]
)
gather2 = asyncio.gather(
*[asyncio.sleep(second, result=f"我睡了 {second} 秒")
for second in (3, 3, 3)]
)
results = await asyncio.gather(
gather1, gather2, asyncio.sleep(6, "我睡了 6 秒")
)
print(results)
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
[['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒'],
['我睡了 3 秒', '我睡了 3 秒', '我睡了 3 秒'],
'我睡了 6 秒']
總耗時: 6.002826208
"""
asyncio.gather 里面可以通過繼續接收 asyncio.gather 返回的對象,從而實現分組功能,還是比較強大的。
如果 gather 里面啥都不傳的話,那么會返回一個空列表。
問題來了,在上面的例子中,我們假設所有請求都不會失敗或拋出異常,這是理想情況。但如果請求失敗了呢?我們來看一下,當 gather 里面的任務出現異常時會發生什么?
import asyncio
async def normal_running():
await asyncio.sleep(3)
return "正常運行"
async def raise_error():
raise ValueError("出錯啦")
async def main():
results = await asyncio.gather(normal_running(), raise_error())
print(results)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
Traceback (most recent call last):
......
raise ValueError("出錯啦")
ValueError: 出錯啦
"""
我們看到拋異常了,其實 gather 函數的原理就是等待一組任務運行完畢,當某個任務完成時,就調用它的 result 方法,拿到返回值。但我們之前介紹 Future 和 Task 的時候說過,如果出錯了,調用 result 方法會將異常拋出來。
import asyncio
async def normal_running():
await asyncio.sleep(3)
return "正常運行"
async def raise_error():
raise ValueError("出錯啦")
async def main():
try:
await asyncio.gather(normal_running(), raise_error())
except Exception:
print("執行時出現了異常")
# 但是剩余的任務仍在執行,拿到當前的所有正在執行的任務
all_tasks = asyncio.all_tasks()
# task 相當于對協程做了一個封裝,那么通過 get_coro 方法也可以拿到對應的協程
print(f"當前剩余的任務:", [task.get_coro().__name__ for task in all_tasks])
# 繼續等待剩余的任務完成
results = await asyncio.gather(
*[task for task in all_tasks if task.get_coro().__name__ != "main"]
)
print(results)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
執行時出現了異常
當前剩余的任務: ['main', 'normal_running']
['正常運行']
"""
可以看到在 await asyncio.gather() 的時候,raise_error() 協程拋異常了,那么異常會向上傳播,在 main() 里面 await 處產生 ValueError。我們捕獲之后查看剩余未完成的任務,顯然只剩下 normal_running() 和 main(),因為任務執行出現異常也代表它完成了。
需要注意的是,一個任務出現了異常,并不影響剩余未完成的任務,它們仍在后臺運行。我們舉個例子證明這一點:
import asyncio, time
async def normal_running():
await asyncio.sleep(5)
return "正常運行"
async def raise_error():
await asyncio.sleep(3)
raise ValueError("出錯啦")
async def main():
try:
await asyncio.gather(normal_running(), raise_error())
except Exception:
print("執行時出現了異常")
# raise_error() 會在 3 秒后拋異常,然后向上拋,被這里捕獲
# 而 normal_running() 不會受到影響,它仍然在后臺運行
# 顯然接下來它只需要再過 2 秒就能運行完畢
time.sleep(2) # 注意:此處會阻塞整個線程
# asyncio.sleep 是不耗費 CPU 的,因此即使 time.sleep 將整個線程阻塞了,也不影響
# 因為執行 time.sleep 時,normal_running() 里面的 await asyncio.sleep(5) 已經開始執行了
results = await asyncio.gather(*[task for task in asyncio.all_tasks()
if task.get_coro().__name__ != "main"])
print(results)
loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
執行時出現了異常
['正常運行']
總耗時: 5.004949666
"""
這里耗時是 5 秒,說明一個任務拋異常不會影響其它任務,因為 time.sleep(2) 執行完畢之后,normal_running() 里面 asyncio.sleep(5) 也已經執行完畢了,說明異常捕獲之后,剩余的任務沒有受到影響。
并且這里我們使用了 time.sleep,在工作中千萬不要這么做,因為它會阻塞整個線程,導致主線程無法再做其他事情了。而這里之所以用 time.sleep,主要是想說明一個任務出錯,那么將異常捕獲之后,其它任務不會受到影響。
那么問題來了,如果發生異常,我不希望它將異常向上拋該怎么辦呢?可能有人覺得這還不簡單,直接來一個異常捕獲不就行了?這是一個解決辦法,但 asyncio.gather 提供了一個參數,可以更優雅的實現這一點。
import asyncio
async def normal_running():
await asyncio.sleep(3)
return "正常運行"
async def raise_error():
raise ValueError("出錯啦")
async def main():
results = await asyncio.gather(
normal_running(), raise_error(),
return_exceptions=True
)
print(results)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
['正常運行', ValueError('出錯啦')]
"""
之前在介紹任務的時候我們說了,不管正常執行結束還是出錯,都代表任務已完成,會將結果和異常都收集起來,只不過其中肯定有一個為 None。然后根據不同的情況,選擇是否將異常拋出來。所以在 asyncio 里面,異常只是一個普通的屬性,會保存在任務對象里面。
對于 asyncio.gather 也是同理,它里面有一個 return_exceptions 參數,默認為 False,當任務出現異常時,會拋給 await 所在的位置。如果該參數設置為 True,那么出現異常時,會直接把異常本身返回(此時任務也算是結束了)。
在 asyncio 里面,異常變成了一個可控的屬性。因為執行是以任務為單位的,當出現異常時,也會作為任務的一個普通的屬性。我們可以選擇將它拋出來,也可以選擇隱式處理掉。
至于我們要判斷哪些任務是正常執行,哪些任務是拋了異常,便可以通過返回值來判斷。如果 isinstance(res, Exception) 為 True,那么證明任務出現了異常,否則正常執行。雖然這有點笨拙,但也能湊合用,因為 API 并不完美。
當然以上這些都不能算是缺點,gather 真正的缺點有兩個:
- 如果我希望所有任務都執行成功,要是有一個任務失敗,其它任務自動取消,該怎么實現呢?比如發送 Web 請求,如果一個請求失敗,其他所有請求也會失敗(要取消請求以釋放資源)。顯然要做到這一點不容易,因為協程被包裝在后臺的任務中;
- 其次,必須等待所有任務執行完成,才能處理結果,如果想要在結果完成后立即處理它們,這就存在問題。例如有一個請求需要 100 毫秒,而另一個請求需要 20 秒,那么在處理 100 毫秒完成的那個請求之前,我們將等待 20 秒。
而 asyncio 也提供了用于解決這兩個問題的 API。
在任務完成時立即處理
如果想在某個結果生成之后就對其進行處理,這是一個問題;如果有一些可以快速完成的等待對象,和一些可能需要很長時間完成的等待對象,這也可能是一個問題。因為 gather 需要等待所有對象執行完畢,這就導致應用程序可能變得無法響應。
想象一個用戶發出 100 個請求,其中兩個很慢,但其余的都很快完成。如果一旦有請求完成,可以向用戶輸出一些信息,來提升用戶的使用體驗。
為處理這種情況,asyncio 公開了一個名為 as_completed 的 API 函數,這個函數接收一個可等待對象(awaitable)組成的列表,并返回一個生成器。通過遍歷,等待它們中的每一個對象都完成,并且哪個先完成,哪個就先被迭代。這意味著將能在結果可用時立即就處理它們,但很明顯此時就沒有所謂的順序了,因為無法保證哪些請求先完成。
import asyncio
import time
async def delay(seconds):
await asyncio.sleep(seconds)
return f"我睡了 {seconds} 秒"
async def main():
# asyncio 提供的用于等待一組 awaitable 對象的 API 都很智能
# 如果檢測到你傳遞的是協程,那么會自動包裝成任務
# 不過還是建議手動包裝一下
tasks = [asyncio.create_task(delay(seconds))
for seconds in (3, 5, 2, 4, 6, 1)]
for finished in asyncio.as_completed(tasks):
print(await finished)
loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
我睡了 1 秒
我睡了 2 秒
我睡了 3 秒
我睡了 4 秒
我睡了 5 秒
我睡了 6 秒
總耗時: 6.000872417
"""
和 gather 不同,gather 是等待一組任務全部完成之后才返回,并且會自動將結果取出來,結果值的順序和添加任務的順序是一致的。對于 as_completed 而言,它會返回一個生成器,我們遍歷它,哪個任務先完成則哪個就先被處理。
那么問題來了,如果出現異常了該怎么辦?很簡單,直接異常捕獲即可。
然后我們再來思考一個問題,任何基于 Web 的請求都存在花費很長時間的風險,服務器可能處于過重的資源負載下,或者網絡連接可能很差。
之前我們看到了通過 wait_for 函數可以為特定請求添加超時,但如果想為一組請求設置超時怎么辦?as_completed 函數通過提供一個可選的 timeout 參數來處理這種情況,它允許以秒為單位指定超時時間。如果花費的時間超過設定的時間,那么迭代器中的每個可等待對象都會在等待時拋出 TimeoutException。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in (1, 5, 6)]
for finished in asyncio.as_completed(tasks, timeout=3):
try:
print(await finished)
except asyncio.TimeoutError:
print("超時啦")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時啦
超時啦
"""
as_completed 非常適合用于盡快獲得結果,但它也有缺點。
第一個缺點是沒有任何方法可快速了解我們正在等待哪個協程或任務,因為運行順序是完全不確定的。如果不關心順序,這可能沒問題,但如果需要以某種方式將結果與請求相關聯,那么將面臨挑戰。
第二個缺點是超時,雖然會正確地拋出異常并繼續運行程序,但創建的所有任務仍將在后臺運行。如果想取消它們,很難確定哪些任務仍在運行,這是我們面臨的另一個挑戰。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in (1, 5, 6)]
for finished in asyncio.as_completed(tasks, timeout=3):
try:
print(await finished)
except asyncio.TimeoutError:
print("超時啦")
# tasks[1] 還需要 2 秒運行完畢,tasks[2] 還需要 3 秒運行完畢
print(tasks[1].done(), tasks[2].done())
await asyncio.sleep(2)
# 此時只剩下 tasks[2],還需要 1 秒運行完畢
print(tasks[1].done(), tasks[2].done())
await asyncio.sleep(1)
# tasks[2] 也運行完畢
print(tasks[1].done(), tasks[2].done())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時啦
超時啦
False False
True False
True True
"""
根據輸出結果可以發現,雖然因為抵達超時時間, await 會導致 TimeoutError,但未完成的任務不會受到影響,它們仍然在后臺執行。
但這對于我們來說,有時卻不是一件好事,因為我們希望如果抵達超時時間,那么未完成的任務就別在執行了,這時候如何快速找到那些未完成的任務呢?為處理這種情況,asyncio 提供了另一個 API 函數:wait。
使用 wait 進行細粒度控制
gather 和 as_completed 的缺點之一是,當我們看到異常時,沒有簡單的方法可以取消已經在運行的任務。這在很多情況下可能沒問題,但是想象一個場景:同時發送大批量 Web 請求(參數格式是相同的),如果某個請求的參數格式錯誤(說明所有請求的參數格式都錯了),那么剩余的請求還有必要執行嗎?顯然是沒有必要的,而且還會消耗更多資源。另外 as_completed 的另一個缺點是,由于迭代順序是不確定的,因此很難準確跟蹤已完成的任務。
于是 asyncio 提供了 wait 函數,注意它和 wait_for 的區別,wait_for 針對的是單個任務,而 wait 則針對一組任務(不限數量)。
注:wait 函數接收的是一組 awaitable 對象,但未來的版本改為僅接收任務對象。因此對于 gather、as_completed、wait 等函數,雖然它們會自動包裝成任務,但我們更建議先手動包裝成任務,然后再傳過去。
并且 wait 和 as_completed 接收的都是任務列表,而 gather 則要求將列表打散,以多個位置參數的方式傳遞,因此這些 API 的參數格式不要搞混了。
然后是 wait 函數的返回值,它會返回兩個集合:一個由已完成的任務(執行結束或出現異常)組成的集合,另一個由未完成的任務組成的集合。而 wait 函數的參數,它除了可以接收一個任務列表之外,還可以接收一個 timeout(超時時間)和一個 return_when(用于控制返回條件)。光說很容易亂,我們來實際演示一下。
等待所有任務完成
如果未指定 retun_when,則此選項使用默認值,并且它的行為與 asyncio.gather 最接近,但也存在一些差異。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds)) for seconds in (3, 2, 4)]
# 和 gather 一樣,默認會等待所有任務都完成
done, pending = await asyncio.wait(tasks)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
for done_task in done:
print(await done_task)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 3
未完成的任務數: 0
我睡了 2 秒
我睡了 4 秒
我睡了 3 秒
"""
await asynio.wait 時,會返回兩個集合,分別保存已完成的任務和仍然運行的任務。并且由于返回的是集合,所以是無序的。默認情況下,asyncio.wait 會等到所有任務都完成后才返回,所以待處理集合的長度為 0。
然后還是要說一下異常,如果某個任務執行時出現異常了該怎么辦呢?
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
if seconds == 3:
raise ValueError("我出錯了(second is 3)")
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
done, pending = await asyncio.wait(tasks)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 5
未完成的任務數: 0
Task exception was never retrieved
future: <Task finished ... coro=<delay() done, defined at .../main.py:3>
exception=ValueError('我出錯了(second is 3)')>
......
raise ValueError("我出錯了(second is 3)")
ValueError: 我出錯了(second is 3)
"""
對于 asyncio.gather 而言,如果某個任務出現異常,那么異常會向上拋給 await 所在的位置。如果不希望它拋,那么可以將 gather 里面的 return_exceptions 參數指定為 True,這樣當出現異常時,會將異常返回。
而 asyncio.wait 也是如此,如果任務出現異常了,那么會直接視為已完成,異常同樣不會向上拋。但是從程序開發的角度來講,返回值可以不要,但異常不能不處理。
所以當任務執行出錯時,雖然異常不會向上拋,但 asyncio 會將它打印出來,于是就有了:Task exception was never retrieved。意思就是該任務出現異常了,但你沒有處理它。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
if seconds == 3:
raise ValueError("我出錯了(second is 3)")
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
# done 里面保存的都是已完成的任務
done, pending = await asyncio.wait(tasks)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
# 所以我們直接遍歷 done 即可
for done_task in done:
# 這里不能使用 await done_task,因為當任務完成時,它就等價于 done_task.result()
# 而任務出現異常時,調用 result() 是會將異常拋出來的,所以我們需要先檢測異常是否為空
exc = done_task.exception()
if exc:
print(exc)
else:
print(done_task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 5
未完成的任務數: 0
我睡了 5 秒
我睡了 2 秒
我出錯了(second is 3)
我睡了 4 秒
我睡了 1 秒
"""
這里調用 result 和 exception 有一個前提,就是任務必須處于已完成狀態,否則會拋異常:InvalidStateError: Result is not ready.。但對于我們當前是沒有問題的,因為 done 里面的都是已完成的任務。
這里能再次看到和 gather 的區別,gather 會幫你把返回值都取出來,放在一個列表中,并且順序就是任務添加的順序。而 wait 返回的是集合,集合里面是任務,我們需要手動拿到返回值。
某個完成出現異常時取消其它任務
從目前來講,wait 的作用和 gather 沒有太大的區別,都是等到任務全部結束再解除等待(出現異常也視作任務完成,并且其它任務不受影響)。那如果我希望當有任務出現異常時,立即取消其它任務該怎么做呢?顯然這就依賴 wait 函數里面的 return_when,它有三個可選值:
- asyncio.ALL_COMPLETED:等待所有任務完成后返回;
- asyncio.FIRST_COMPLETED:有一個任務完成就返回;
- asyncio.FIRST_EXCEPTION:當有任務出現異常時返回;
顯然為完成這個需求,我們應該將 return_when 指定為 FIRST_EXCEPTION。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
if seconds == 3:
raise ValueError("我出錯了(second is 3)")
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds), name=f"睡了 {seconds} 秒的任務")
for seconds in range(1, 6)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
print("都有哪些任務完成了?")
for t in done:
print(" " + t.get_name())
print("還有哪些任務沒完成?")
for t in pending:
print(" " + t.get_name())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 3
未完成的任務數: 2
都有哪些任務完成了?
睡了 2 秒的任務
睡了 3 秒的任務
睡了 1 秒的任務
還有哪些任務沒完成?
睡了 4 秒的任務
睡了 5 秒的任務
"""
當 delay(3) 失敗時,顯然 delay(1)、delay(2) 已完成,而 delay(4) 和 delay(5) 未完成。此時集合 done 里面的就是已完成的任務,pending 里面則是未完成的任務。
當 wait 返回時,未完成的任務仍在后臺繼續運行,如果我們希望將剩余未完成的任務取消掉,那么直接遍歷 pending 集合即可。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
if seconds == 3:
raise ValueError("我出錯了(second is 3)")
print(f"我睡了 {seconds} 秒")
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in range(1, 6)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
# 此時未完成的任務仍然在后臺運行,這時候我們可以將它們取消掉
for t in pending:
t.cancel()
# 阻塞 3 秒
await asyncio.sleep(3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
已完成的任務數: 3
未完成的任務數: 2
"""
在 await asyncio.sleep(3) 的時候,剩余兩個任務并沒有輸出,所以任務確實被取消了。注:出現異常的任務會被掛在已完成集合里面,如果沒有任務在執行時出現異常,那么效果等價于 ALL_COMPLETED。
當任務完成時處理結果
ALL_COMPLETED 和 FIRST_EXCEPTION 都有一個缺點,在任務成功且不拋出異常的情況下,必須等待所有任務完成。對于之前的用例,這可能是可以接受的,但如果想要在某個協程成功完成后立即處理結果,那么現在的情況將不能滿足我們的需求。
雖然這個場景可使用 as_completed 實現,但 as_completed 的問題是沒有簡單的方法可以查看哪些任務還在運行,哪些任務已經完成。因為遍歷的時候,我們無法得知哪個任務先完成,所以 as_completed 無法完成我們的需求。
好在 wait 函數的 return_when 參數可以接收 FIRST_COMPLETED 選項,表示只要有一個任務完成就立即返回,而返回的可以是執行出錯的任務,也可以是成功運行的任務(任務失敗也表示已完成)。然后,我們可以取消其他正在運行的任務,或者讓某些任務繼續運行,具體取決于用例。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
if seconds == 3:
raise ValueError("我出錯了(second is 3)")
print(f"我睡了 {seconds} 秒")
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in range(1, 6)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
已完成的任務數: 1
未完成的任務數: 4
"""
當 return_when 參數為 FIRST_COMPLETED 時,那么只要有一個任務完成就會立即返回,然后我們處理完成的任務即可。至于剩余的任務,它們仍在后臺運行,我們可以繼續對其使用 wait 函數。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
if seconds == 3:
raise ValueError("我出錯了(second is 3)")
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in range(1, 6)]
while True:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in done:
exc = t.exception()
print(exc) if exc else print(t.result())
if pending: # 還有未完成的任務,那么繼續使用 wait
tasks = pending
else:
break
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
我出錯了(second is 3)
我睡了 4 秒
我睡了 5 秒
"""
整個行為和 as_completed 是一致的,但這種做法有一個好處,就是我們每一步都可以準確地知曉哪些任務已經完成,哪些任務仍然運行,并且也可以做到精確取消指定任務。
處理超時
除了允許對如何等待協程完成進行更細粒度的控制外,wait 還允許設置超時,以指定我們希望等待完成的時間。要啟用此功能,可將 timeout 參數設置為所需的最大秒數,如果超過了這個超時時間,wait 將立即返回 done 和 pending 任務集。
不過與目前所看到的 wait_for 和 as_completed 相比,超時在 wait 中的行為方式存在一些差異。
1)協程不會被取消。
當使用 wait_for 時,如果任務超時,則引發 TimeouError,并且任務也會自動取消。但使用 wait 的情況并非如此,它的行為更接近我們在 as_completed 中看到的情況。如果想因為超時而取消協程,必須顯式地遍歷任務并取消,否則它們仍在后臺運行。
2)不會引發超時錯誤。
如果發生超時,則 wait 返回所有已完成的任務,以及在發生超時的時候仍處于運行狀態的所有任務。
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in range(1, 6)]
done, pending = await asyncio.wait(tasks, timeout=3.1)
print(f"已完成的任務數: {len(done)}")
print(f"未完成的任務數: {len(pending)}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 3
未完成的任務數: 2
"""
wait 調用將在 3 秒后返回 done 和 pending 集合,在 done 集合中,會有三個已完成的任務。而耗時 4 秒和 5 秒的任務,由于仍在運行,因此它們將出現在 pending 集合中。我們可以繼續等待它們完成并提取返回值,也可以將它們取消掉。
需要注意:和之前一樣,pending 集合中的任務不會被取消,并且繼續運行,盡管會超時。對于要終止待處理任務的情況,我們需要顯式地遍歷 pending 集合并在每個任務上調用 cancel。
為什么要先將協程包裝成任務
我們說協程在傳給 wait 的時候會自動包裝成任務,那為什么我們還要手動包裝呢?
import asyncio
async def delay(seconds):
await asyncio.sleep(seconds)
return f"我睡了 {seconds} 秒"
async def main():
tasks = [asyncio.create_task(delay(seconds))
for seconds in range(1, 6)]
done, pending = await asyncio.wait(tasks, timeout=3.1)
print(all(map(lambda t: t in tasks, done)))
print(all(map(lambda t: t in tasks, pending)))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
True
True
"""
如果 wait 函數接收的就是任務,那么 wait 函數就不會再包裝了,所以 done 和 pending 里面的任務和 tasks 里面的任務是相同的。基于這個條件,我們后續可以做一些比較之類的。
比如有很多 Web 請求任務,但如果當未完成的任務是 task1、task2、task3,那么就取消掉,于是可以這么做。
for t in pending:
if t in (task1, task2, task3):
t.cancel()
如果返回的 done 和 pending 里的任務,是在 wait 函數中自動創建的,那么我們就無法進行任何比較來查看 pending 集合中的特定任務。
小結
1)asyncio.gather 函數允許同時運行多個任務,并等待它們完成。一旦傳遞給它的所有任務全部完成,這個函數就會返回。由于 gather 會拿到里面每個任務的返回值,所以它要求每個任務都是成功的,如果有任務執行出錯(沒有返回值),那么獲取返回值的時候就會將異常拋出來,然后向上傳遞給 await asyncio.gather。
為此,可以將 return_exceptions 設置為 True,這將返回成功完成的可等待對象的結果,以及產生的異常(異常會作為一個普通的屬性返回,和返回值是等價的)。
2)可使用 as_completed 函數在可等待對象列表完成時處理它們的結果,它會返回一個可以循環遍歷的生成器。一旦某個協程或任務完成,就能訪問結果并處理它。
3)如果希望同時運行多個任務,并且還希望能了解哪些任務已經完成,哪些任務在運行,則可以使用 wait。這個函數還允許在返回結果時進行更多控制,返回時,我們會得到一組已經完成的任務和一組仍在運行的任務。
然后可以取消任何想要取消的任務,或執行其他任何需要執行的任務。并且 wait 里面的任務出現異常,也不會影響其它任務,異常會作為任務的一個屬性,只是在我們沒有處理的時候會給出警告。至于具體的處理方式,我們直接通過 exception 方法判斷是否發生了異常即可,沒有異常返回 result(),有異常返回 exception()。