有效的提高程序執行效率的兩種方法是異步和并發,Golang,node.js之所以可以有很高執行效率主要是他們的協程和異步并發機制。實際上異步和并發是每一種現代語言都在追求的特性,當然Python也不例外,今天我們就講講Python 3中的異步并發編程。
概述
Python 標準庫提供了許多模塊來處理異步并發和多進程任務,包括:
本文中,我們主要以于asyncio和 concurrent.futures為主講講Python中異步和并發機制及實例。當然你也可能正在使用_thread和threading,但是無疑最應用用multiprocessing和concurrent.futures。
基本理論
異步編程
編寫并發代碼(使用_thread或threading模塊)的是為了解決大家對CPU調用和中斷的問題(因為CPU中一次只能運行一個線程),這涉及了"CPU 上下文切換"的成本,雖然速度很快,但是也需要消耗資源。而且還要處理"條件競爭","死鎖/實時"和"資源不足"(某些線程被過度使用,而其他線程未被充分利用)等等的問題。而這些正是異步任務可以避免的。
asyncio
asyncio是一個標準庫,用于使用asyn/cawait語法編寫并發代碼。asyncio模塊分別提供了高級和低級 API。庫和框架開發人員將使用低級 API,同時鼓勵使用不同于更傳統threading或multiprocess異步代碼執行方法。它利用事件循環的來處理異步"任務"的調度,而不是傳統的線程或子進程。
需要提及的是asyncio是為了解決 I/O 性能,而不是CPU 綁定操作。因此,asyncio不是所有類型的異步執行的替代品。
asyncio基于"合作多任務"的概念設計的,因此可以完全控制 CPU上下文切換發生的時間(即上下文切換發生在應用程序級別,而不是硬件級別)。
使用asyncio,Python調度器負責管理,因此應用程序可能隨時進行上下文切換)。
所以使用asyncio時,還需要使用某種形式的"鎖定"機制來防止多個線程訪問/更改共享內存(否則可能會破壞程序的線程安全)。
concurrent.futures
concurrent.futures 模塊是為異步執行可調用項提供高級接口,為thread 和multiprocessing模塊提供了高級抽象,這就是為什么本文沒有詳細討論這些模塊的原因。事實上,_thread模塊是一個非常低級別的 API,_threading模塊本身是建立在它之上的。
前面我們已經提到,異步可以幫助我們避免使用線程,那么,如果它只是線程(和多處理)上的抽象,那么為什么要使用concurrent.futures呢?嗯,因為并非所有庫/模塊/API都支持異步模型。
例如,如果使用boto3和 AWS S3 并與之交互,就會發現這些是同步操作。可以在多線程代碼中包裝這些調用,但最好使用concurrent.futures,因為這樣不僅受益于傳統線程,而且受益于異步友好特性。該模塊還設計為和異步事件循環互操作,從而更輕松地在異步驅動應用程序中處理線程/子進程池。
此外,當需要線程池或子進程池時,還需要利用concurrent.futures,同時可以使用干凈而現代的 Python API。
綠色線程
實現異步編程的方法有很多種。有事件循環方法(異步實現),這種"回調"風格是 JAVAScript 等單線程語言一直采用的方法。傳統上還有一個稱為"綠色線程"的概念。
從本質上講,綠色線程的外觀和感覺與普通線程完全一樣,只不過線程是由應用程序代碼而不是硬件安排的。因此,可以有效地處理(與事件循環一樣)的確定性上下文切換問題。但是處理共享內存同樣存在問題。
因此,讓我們現在快速看看什么是"事件循環",因為它是使異步工作的基礎,為什么我們可以避免"回調地獄"和與"綠色線程"固有的問題...
事件循環
所有異步應用程序的核心元素是"事件循環"。事件循環是計劃并運行異步任務的內容,它還包括處理網絡 IO 操作和子進程運行。
異步事件循環非常有效,是因為它由Python內部在生成器實現的。生成器使函數能夠部分執行,然后在特定點停止,維護一堆對象和異常,然后再恢復。
可等待性
異步背后的驅動力是計劃異步"任務"的能力。Python 中有幾個不同類型的對象有助于支持此功能,它們通常通過Awaitables(可等待)進行分組。
如果某物可以在表達式中使用,它是可以等待的。await有三種主要的等待類型:
協程、任務和Future。
注意:Future 是一種低級類型,因此,如果不是庫/框架開發人員,則無需過多考慮它。
協程
協程屬于可等待對象,因此可以在其他協程中被等待。協程中兩個重要的概念:協程函數,定義為async def的函數。
協程對象:通過調用協程函數所返回的對象。
asyncio基于生成器的協程函數修飾函數定義的函數將被async/await語法取代,但將繼續支持,直到Python 3.10
任務
任務用于同時安排協同程序。所有異步應用程序通常(至少)具有單個"主"入口點任務,該任務將安排在事件循環上立即運行。這是使用asyncio.run函數完成的。
協同例程函數預計將傳遞給asyncio.run ,而內部異步將使用幫助器函數coroutines.iscoroutine檢查該函數。如果不是協同程序,則引發錯誤,否則協同例程將傳遞給 loop.run_until_complete
run_until_complete函數需要一個Future,并使用另一個幫助futures.isfuture函數來檢查提供的類型。如果檢查不是Future,則低級 API ensure_future用于將協同例程轉換為 Future。
在舊版本的 Python 中,如果要手動創建自己的 Future 并將其安排到事件循環上,那么就要使用asyncio.ensure_future(現在被視為低級 API),但使用 Python 3.7+ 時,它被asyncio.create_task所取代。
另外使用 Python 3.7,直接與事件循環交互(例如獲取事件循環、創建任務并將其傳遞到事件循環)的函數已被替換為create_taskasyncio.run。
我們可以通過下面的 API 查看在事件循環上運行的任務的狀態:
asyncio.current_task
asyncio.all_tasks
Future
Future 是一個低級await對象,表示異步操作的最終結果。該 API 的存在是為了啟用基于回調的代碼與async/await一起使用,而loop.run_in_executor是返回 Future 的異步低級 API 函數的示例。
異步編程實例
運行異步程序
高級 API(根據 Python 3.7+)是:
import asyncio
async def foo():
print("Foo!")
async def hello_cc():
await foo() # waits for `foo()` to complete
print("Hello Chongchong!")
asyncio.run(hello_world())
.run函數始終創建新的事件循環并在末尾關閉它。如果使用的是較低級別的 API,則必須手動處理該API:
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_cc())
loop.close()
在 REPL 中運行異步代碼
在 Python 3.8 之前,我們無法在標準 Python REPL 中執行異步代碼(需要改用 IPython REPL)。
要使用最新版本的 Python 執行此操作,運行python -m asyncio 。REPL 啟動后,就無需再使用asyncio.run(),只需直接使用await語句。
[Clang 10.0.1 (clang-1001.0.46.4)] on darwin
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio
>>> async def foo():
... await asyncio.sleep(5)
... print("done")
...
>>> await foo()
done
請注意,REPL 在啟動時會自動執行import asyncio,因此我們可以使用任何asyncio函數(如.sleep函數),而無需自行手動鍵入導入語句。
使用其他事件循環模塊
如果由于種種原因我們不想使用asyncio提供的事件循環(這是純 Python 實現),則可以將其交換為另一個事件循環,如uvloop。
uvloop 是內置異步事件循環的快速回放替換。uvloop 在中實現,在引擎中使用libuv。根據uvloop的作者基準測試,它在執行性能上可堪比golang。
uvloop的使用也很簡單,先使用pip install uvloop 安裝,然后添加一個uvloop.install()調用,如下所示:
import asyncio
import uvloop
async def foo():
print("Foo!")
async def hello_cc():
await foo()
print("Hello Chongchong!")
uvloop.install()
asyncio.run(hello_cc())
并發函數
以下函數有助于協調同時運行函數,并提供根據應用程序需求的不同控制。
asyncio.gather:獲取一系列可等待值,返回成功等待的值的聚合列表。
asyncio.shield:防止取消可等待的對象。
asyncio.wait:等待一系列可等待,直到滿足給定的"條件"。
asyncio.wait_for:等待單個等待,直到達到給定的"超時"。
asyncio.as_completed: 類似于gather,但返回在結果準備就緒時填充的 Future。
注意:gather有處理錯誤和取消的特定選項。例如,return_exceptions: False如果隨后由其中一個可等待項引發的第一個異常返回到 gather的調用方,其中似乎設置為True,則異常將連同成功結果一起在列表中聚合。如果gather()
被取消,所有提交的等待(尚未完成)也將被取消。
已棄用函數
@asyncio.coroutine: Python 3.10刪除async def了
asyncio.sleep:將在 Python 3.10 中刪除參數loop
注意:這些大多數這些 API 中都使用了一個參數參數loop,讓其指示要使用的特定事件循環。Python 3.8 中已經棄用了該參數,并計劃在 3.10 中完全刪除它。
gather
下面的示例演示如何等待多個異步任務完成。
import asyncio
async def foo(n):
await asyncio.sleep(5) # wait 5s before continuing
print(f"n: {n}!")
async def main():
tasks = [foo(1), foo(2), foo(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
wait
下面的示例使用FIRST_COMPLETED選項,表示無論任務首先完成什么,都將返回什么。
import asyncio
from random import randrange
async def foo(n):
s = randrange(5)
print(f"{n} will sleep for: {s} seconds")
await asyncio.sleep(s)
print(f"n: {n}!")
async def main():
tasks = [foo(1), foo(2), foo(3)]
result = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(result)
asyncio.run(main())
此程序的示例輸出是:
1 will sleep for: 4 seconds
2 will sleep for: 2 seconds
3 will sleep for: 1 seconds
n: 3!
({<Task finished coro=<foo() done, defined at await.py:5> result=None>}, {<Task pending coro=<foo() running at await.py:8> wait_for=<Future pending cb=[<TaskWakeupMethWrApper object at 0x10322b468>()]>>, <Task pending coro=<foo() running at await.py:8> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10322b4c8>()]>>})
wait_for
下面的示例演示如何利用超時來防止無休止地等待異步任務完成。
import asyncio
async def foo(n):
await asyncio.sleep(10)
print(f"n: {n}!")
async def main():
try:
await asyncio.wait_for(foo(1), timeout=5)
except asyncio.TimeoutError:
print("timeout!")
asyncio.run(main())
注意: asyncio.TimeoutError不提供任何額外信息,因此嘗試在輸出中使用它沒有意義(例如 except asyncio.TimeoutError as err: print(err))。
as_completed
下面的示例演示如何as_complete生成要完成的第一個任務,然后是下一個最快任務,下一個任務在完成之前。
import asyncio
from random import randrange
async def foo(n):
s = randrange(10)
print(f"{n} will sleep for: {s} seconds")
await asyncio.sleep(s)
return f"{n}!"
async def main():
counter = 0
tasks = [foo("a"), foo("b"), foo("c")]
for future in asyncio.as_completed(tasks):
n = "quickest" if counter == 0 else "next quickest"
counter += 1
result = await future
print(f"the {n} result was: {result}")
asyncio.run(main())
此程序的示例輸出是:
c will sleep for: 9 seconds
a will sleep for: 1 seconds
b will sleep for: 0 seconds
the quickest result was: b!
the next quickest result was: a!
the next quickest result was: c!
create_task
下面的示例演示如何將協同例程轉換為任務并將其安排到事件循環上。
import asyncio
async def foo():
await asyncio.sleep(10)
print("Foo!")
async def hello_cc():
task = asyncio.create_task(foo())
print(task)
await asyncio.sleep(5)
print("Hello Chongchong!")
await asyncio.sleep(10)
print(task)
asyncio.run(hello_cc())
從上面的程序中我們可以看到,我們用create_task將協同例程函數轉換為任務。這將自動安排在下一個可用刻度處在事件循環上運行的任務。
這與較低級別的 API ensure_future(這是創建新任務的首選方法)不同。ensure_future函數具有特定的邏輯分支,使其可用于更多的輸入類型,而不是create_task僅支持將協同例程排到事件循環中并將其包裝到任務中(請參閱:ensure_future源代碼)。
此程序的輸出將是:
<Task pending coro=<foo() running at create_task.py:4>>
Hello Chongchong!
Foo!
<Task finished coro=<foo() done, defined at create_task.py:4> result=None>
讓我們回顧一下代碼,并比較我們可以看到的上述輸出...
我們將轉換foo()到任務,然后在創建任務后立即打印返回的任務。因此,當我們打印任務時,我們可以看到其狀態顯示為"掛起"(因為它尚未執行)。
接下來,我們將sleep五秒鐘,因為這將導致foo任務現在運行(因為當前任務hello_world將被視為繁忙)。
在foo任務中,我們也處于sleep狀態,但時間比hello_world長,因此事件循環現在上下文將切換回hello_world任務,在睡眠時將傳遞該任務,我們將打印輸出字符串 Hello Chongchong。
最后,我們又sleep十秒鐘。這只是為了我們可以給foo任務足夠的時間來完成和打印自己的輸出。如果我們不這樣做,那么hello_world任務將完成并關閉事件循環。最后一行hello_world是打印foo任務,我們將看到foo任務的狀態現在將顯示為"已完成"。
回調
處理任務(實際上是未來)時,一旦 Future 在任務上設置了值,就可以執行"回調"函數。
以下示例通過修改上一create_task示例代碼來演示這一點:
import asyncio
async def foo():
await asyncio.sleep(10)
return "Foo!"
def got_result(future):
print(f"got the result! {future.result()}")
async def hello_cc():
task = asyncio.create_task(foo())
task.add_done_callback(got_result)
print(task)
await asyncio.sleep(5)
print("Hello Chongchong!")
await asyncio.sleep(10)
print(task)
asyncio.run(hello_cc())
請注意,在上面的程序中,我們增加了一個函數,該got_result
函數期望接收"未來"類型,從而調用.result()"未來"。另請注意,要調用此函數,我們會將其傳遞給.add_done_callback()在create_task返回的任務上調用它。
此程序的輸出是:
<Task pending coro=<foo() running at gather.py:4> cb=[got_result() at gather.py:9]>
Hello Chongchong!
got the result! Foo!
<Task finished coro=<foo() done, defined at gather.py:4> result='Foo!'>
任務池
在處理大量并發操作時,最好利用線程(和/或子進程)的"池"來防止耗盡應用程序的主機資源。
這也是concurrent.futures模塊的來歷。它提供了一個稱為執行器的概念來幫助它,它可以獨立運行或集成到現有的異步事件循環中。
執行器
有兩種類型的執行器:ThreadPoolExecutor和ProcessPoolExecutor
讓我們看一下在其中一個執行器中執行代碼的第一種方法,方法是使用異步事件循環來計劃執行器的運行。
為此,需要調用事件循環的.run_in_executor()函數,并將執行器類型作為第一個參數傳遞。如果None被提供,則使用默認執行器(即 ThreadPoolExecutor)下面的實例來自Python官方文檔:
import asyncio
import concurrent.futures
def blocking_io():
with open("/dev/urandom", "rb") as f:
return f.read(100)
def cpu_bound():
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print("default thread pool", result)
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print("custom thread pool", result)
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print("custom process pool", result)
asyncio.run(main())
在其中一個執行器中執行代碼的第二種方法是將要執行的代碼直接發送到池。這意味著我們不必獲取當前事件循環將池傳遞到其中(如前面的示例所示),但它附帶了一個警告,即父程序不會等待任務完成,除非您明確告訴它(我接下來將演示)。考慮到這一點,讓我們來看看這個替代方法。它涉及調用執行者的方法submit():
import concurrent.futures
import time
def slow_op(*args):
print(f"arguments: {args}")
time.sleep(5)
print("slow operation complete")
return 123
def do_something():
with concurrent.futures.ProcessPoolExecutor() as pool:
future = pool.submit(slow_op, "a", "b", "c")
for fut in concurrent.futures.as_completed([future]):
assert future.done() and not future.cancelled()
print(f"got the result from slow_op: {fut.result()}")
if __name__ == "__main__":
print("program started")
do_something()
print("program complete")
這里值得注意的一點是,如果我們沒有使用with語句(如上例中所示),則意味著一旦池完成其工作,將不會關閉它,因此(取決于程序是否繼續運行),可能會發現資源沒有被清理。
要解決此問題,可以調用.shutdown()通過其父類向兩種類型的執行器公開的方法。concurrent.futures.Executor
下面是一個這樣做的示例,但現在使用線程池執行器:
import concurrent.futures
THREAD_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def slow_op(*args):
print(f"arguments: {args}")
print("some kind of slow operation")
return 123
def do_something():
future = THREAD_POOL.submit(slow_op, "a", "b", "c")
THREAD_POOL.shutdown()
assert future.done() and not future.cancelled()
print(f"got the result from slow_op: {future.result()}")
if __name__ == "__main__":
print("program started")
do_something()
print("program complete")
現在,我尚未在示例中還未使用time.sleep(),因為我們使用的是線程池,并且time.sleep()是 CPU 綁定操作,如果直接使用將阻止線程完成。
這意味著我們的示例可能總是導致slow_op()函數完成之前,我們開始檢查 future.done()。所以,是的,這不是最好的例子。通過合并一個不阻止的真正緩慢的操作,可以更現實地測試它。
但想象一下,我們有一個真正緩慢的操作發生,這意味著任務沒有完成,當我們檢查future.done()。
在這種情況下,我們應該注意,調用.shutdown()的位置是在我們明確等待計劃的任務完成之前,然而當我們斷言返回的future是否.done()返回時,我們會發現,無論嘗試關閉線程池,任務都標記為"已完成"。
這是因為關閉方wait=True法的默認行為意味著它將等待所有計劃的任務完成,然后再關閉執行器池。
因此,該方法是同步調用.shutdown()(即,它確保所有任務在關閉之前都已完成,因此我們可以保證所有結果都可用)。
如果我們傳遞.shutdown(wait=False)相反,那么調用future.done()將引發一個異常(因為計劃的任務仍將在線程池關閉時運行),因此在這種情況下,我們需要確保我們使用另一種機制來獲取計劃任務的結果(如 concurrent.futures.as_completed或concurrent.futures.wait )。
asyncio.Future和concurrent.futures.Future
最后一點要提的是,concurrent.futures.Future對象與asyncio.Future
不同。asyncio.Future用于異步事件循環,并且可等待。concurrent.futures.Future不可等待。
使用事件循環的.run_in_executor()方法將通過將concurrent.futures.Future類型包裝到asyncio.wrap_future(有關詳細信息,請參閱下一節)來提供兩種未來類型之間的必要互操作性。
asyncio.wrap_future
由于 Python 3.5,我們可以使用asyncio.wrap_future 將 concurrent.futures.Futur轉換為easyncio.Future 。下面可以看到這方面的示例...
import asyncio
import random
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
async def doit():
identify = random.randint(1, 100)
future = pool.submit(return_after_5_secs, (f"result: {identify}"))
awaitable = asyncio.wrap_future(future)
print(f"waiting result: {identify}")
return await awaitable
async def app():
# run some stuff multiple times
tasks = [doit(), doit()]
result = await asyncio.gather(*tasks)
print(result)
print("waiting app")
asyncio.run(app())
此程序的輸出將是:
waiting app
waiting result: 62
waiting result: 83
# ...five seconds pass by...
['result: 62', 'result: 83']