Python/ target=_blank class=infotextkey>Python異步編程全攻略
如果你厭倦了多線程,不妨試試python的異步編程,再引入async, await關(guān)鍵字之后語法變得更加簡潔和直觀,又經(jīng)過幾年的生態(tài)發(fā)展,現(xiàn)在是一個很不錯的并發(fā)模型。
下面介紹一下python異步編程的方方面面。
在python異步編程中,可能出現(xiàn)很多其他的對象,比如Future, Task, 后者繼承自前者,但是為了統(tǒng)一,無論是Future還是Task,本文中統(tǒng)一稱呼為協(xié)程。
與多線程的比較
因為GIL的存在,所以Python的多線程在CPU密集的任務(wù)下顯得無力,但是對于IO密集的任務(wù),多線程還是足以發(fā)揮多線程的優(yōu)勢的,而異步也是為了應(yīng)對IO密集的任務(wù),所以兩者是一個可以相互替代的方案,因為設(shè)計的不同,理論上異步要比多線程快,因為異步的花銷更少, 因為不需要額外系統(tǒng)申請額外的內(nèi)存,而線程的創(chuàng)建跟系統(tǒng)有關(guān),需要分配一定量的內(nèi)存,一般是幾兆,比如linux默認(rèn)是8MB。
雖然異步很好,比如可以使用更少的內(nèi)存,比如更好地控制并發(fā)(也許你并不這么認(rèn)為:))。但是由于async/await 語法的存在導(dǎo)致與之前的語法有些割裂,所以需要適配,需要付出額外的努力,再者就是生態(tài)遠遠沒有同步編程強大,比如很多庫還不支持異步,所以你需要一些額外的適配。
用于測試的web服務(wù)
為了不給其他網(wǎng)站帶來困擾,這里首先在自己電腦啟動web服務(wù)用于測試,代碼很簡單。
# web.py
import asyncio
from random import random
import uvicorn
from fastapi import FastAPI
App = FastAPI()
@app.get("/")
async def index():
await asyncio.sleep(1)
return {"msg": "ok"}
@app.get("/random")
async def index():
await asyncio.sleep(1)
return {"msg": random()}
if __name__ == "__main__":
# uvicorn.run(app)
# 如果需要熱加載(reload), 需要傳入一個字符串而不是application對象
uvicorn.run("web:app", reload=True)import asyncio
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def index():
await asyncio.sleep(1)
return {"msg": "ok"}
if __name__ == "__main__":
uvicorn.run(app)
本文所有依賴如下:
- Python > 3.7+
- fastapi
- aiohttp
- uvicorn
所有依賴可通過代碼倉庫的requirements.txt一次性安裝。
pip install requirements.txt
并發(fā),并發(fā),并發(fā)
首先看一個錯誤的例子
# test1.py
import asyncio
from datetime import datetime
import aiohttp
async def main(workers: int, url: str):
async with aiohttp.ClientSession() as sess:
for _ in range(workers):
async with sess.get(url) as resp:
print("響應(yīng)內(nèi)容", await resp.json())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))
end = datetime.now()
print("耗時:", end - start)
輸出如下:
$ python test1.py
響應(yīng)內(nèi)容 {'msg': 'ok'}
響應(yīng)內(nèi)容 {'msg': 'ok'}
響應(yīng)內(nèi)容 {'msg': 'ok'}
耗時: 0:00:03.011565
發(fā)現(xiàn)花費了3秒,不符合預(yù)期呀。。。。這是因為雖然用了協(xié)程,但是每個協(xié)程是串行的運行,也就是說后一個等前一個完成之后才開始,那么這樣的異步代碼并沒有并發(fā),所以我們需要讓這些協(xié)程并行起來
# test2.py
import asyncio
from datetime import datetime
import aiohttp
async def run(sess: aiohttp.ClientSession, url: str):
async with sess.get(url) as resp:
print("響應(yīng)內(nèi)容", await resp.json())
async def main(workers: int, url: str):
async with aiohttp.ClientSession() as sess:
for _ in range(workers):
asyncio.ensure_future(run(sess, url))
await asyncio.sleep(1.1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))
end = datetime.now()
print("耗時:", end - start)
為了讓代碼變動的不是太多,所以這里用了一個笨辦法來等待所有任務(wù)完成, 之所以在main函數(shù)中等待是為了不讓ClientSession關(guān)閉, 如果你移除了main函數(shù)中的等待代碼會發(fā)現(xiàn)報告異常RuntimeError: Session is closed,而代碼里的解決方案非常的不優(yōu)雅,需要手動的等待,為了解決這個問題,我們再次改進代碼。
# test3.py
import asyncio
from datetime import datetime
import aiohttp
async def run(sess: aiohttp.ClientSession, url: str):
async with sess.get(url) as resp:
print("響應(yīng)內(nèi)容", await resp.json())
async def main(workers: int, url: str):
async with aiohttp.ClientSession() as sess:
futures = []
for _ in range(workers):
futures.append(asyncio.ensure_future(run(sess, url)))
done, pending = await asyncio.wait(futures)
print(done, pending)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))
end = datetime.now()
print("耗時:", end - start)
這里解決的方式是通過asyncio.wait方法等待一個協(xié)程列表,默認(rèn)是等待所有協(xié)程結(jié)束后返回,會返回一個完成(done)列表,以及一個待辦(pending)列表。
如果我們不想要協(xié)程對象而是結(jié)果,那么我們可以使用asyncio.gather
# test4.py
import asyncio
from datetime import datetime
import aiohttp
async def run(sess: aiohttp.ClientSession, url: str, id: int):
async with sess.get(url) as resp:
print("響應(yīng)內(nèi)容", await resp.json())
return id
async def main(workers: int, url: str):
async with aiohttp.ClientSession() as sess:
futures = []
for i in range(workers):
futures.append(asyncio.ensure_future(run(sess, url, i)))
# 注意: 這里要講列表解開
rets = await asyncio.gather(*futures)
print(rets)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))
end = datetime.now()
print("耗時:", end - start)
結(jié)果輸出如下:
$ python test4.py
響應(yīng)內(nèi)容 {'msg': 'ok'}
響應(yīng)內(nèi)容 {'msg': 'ok'}
響應(yīng)內(nèi)容 {'msg': 'ok'}
[0, 1, 2]
耗時: 0:00:01.011840
小結(jié)
通過asyncio.ensure_future我們就能創(chuàng)建一個協(xié)程,跟調(diào)用一個函數(shù)差別不大,為了等待所有任務(wù)完成之后退出,我們需要使用asyncio.wait等方法來等待,如果只想要協(xié)程輸出的結(jié)果,我們可以使用asyncio.gather來獲取結(jié)果。
同步
雖然前面能夠隨心所欲的創(chuàng)建協(xié)程,但是就像多線程一樣,我們也需要處理協(xié)程之間的同步問題,為了保持語法及使用情況的一致,多線程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用異步的關(guān)鍵字,比如async with/ await等
鎖 lock
通過鎖讓并發(fā)慢下來,讓協(xié)程一個一個的運行。
# test5.py
import asyncio
from datetime import datetime
import aiohttp
lock = asyncio.Lock()
async def run(sess: aiohttp.ClientSession, url: str, id: int):
async with lock:
async with sess.get(url) as resp:
print("響應(yīng)內(nèi)容", await resp.json())
return id
async def main(workers: int, url: str):
async with aiohttp.ClientSession() as sess:
futures = []
for i in range(workers):
futures.append(asyncio.ensure_future(run(sess, url, i)))
# 注意: 這里要講列表解開
rets = await asyncio.gather(*futures)
print(rets)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))
end = datetime.now()
print("耗時:", end - start)
輸出如下:
$ python test5.py
響應(yīng)內(nèi)容 {'msg': 'ok'}
響應(yīng)內(nèi)容 {'msg': 'ok'}
響應(yīng)內(nèi)容 {'msg': 'ok'}
[0, 1, 2]
耗時: 0:00:03.007251
通過觀察很容易發(fā)現(xiàn),并發(fā)的速度因為鎖而慢下來了,因為每次只有一個協(xié)程能獲得鎖,所以并發(fā)變成了串行。
事件 event
通過事件來通知特定的協(xié)程開始工作,假設(shè)有一個任務(wù)是根據(jù)http響應(yīng)結(jié)果選擇是否激活。
# test6.py
import asyncio
from datetime import datetime
import aiohttp
big_event = asyncio.Event()
small_event = asyncio.Event()
async def big_waiter():
await small_event.wait()
print(f"{datetime.now()} big waiter 收到任務(wù)事件")
async def small_waiter():
await big_event.wait()
print(f"{datetime.now()} small waiter 收到任務(wù)事件")
async def run(sess: aiohttp.ClientSession, url: str, id: int):
async with sess.get(url) as resp:
ret = await resp.json()
print("響應(yīng)內(nèi)容", ret)
data = ret["msg"]
if data > 0.5:
big_event.set()
else:
small_event.set()
return data
async def main(workers: int, url: str):
asyncio.ensure_future(big_waiter())
asyncio.ensure_future(big_waiter())
asyncio.ensure_future(small_waiter())
asyncio.ensure_future(small_waiter())
async with aiohttp.ClientSession() as sess:
futures = []
for i in range(workers):
futures.append(asyncio.ensure_future(run(sess, url, i)))
await asyncio.wait(futures)
if not big_event.is_set():
big_event.set()
if not small_event.is_set():
small_event.set()
# 等到其他pending可馬上運行完成的任務(wù)運行結(jié)束
await asyncio.sleep(0)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))
end = datetime.now()
print("耗時:", end - start)
輸出如下:
響應(yīng)內(nèi)容 {'msg': 0.9879470259657458}
2022-07-11 10:16:51.577579 small waiter 收到任務(wù)事件
2022-07-11 10:16:51.577579 small waiter 收到任務(wù)事件
響應(yīng)內(nèi)容 {'msg': 0.33312954919903903}
2022-07-11 10:16:51.578574 big waiter 收到任務(wù)事件
2022-07-11 10:16:51.578574 big waiter 收到任務(wù)事件
響應(yīng)內(nèi)容 {'msg': 0.41934453838367824}
耗時: 0:00:00.996697
可以看到事件(Event)等待者都是在得到響應(yīng)內(nèi)容之后輸出,并且事件(Event)可以是多個協(xié)程同時等待。
條件 Condition
上面的事件雖然很棒,能夠在不同的協(xié)程之間同步狀態(tài),并且也能夠一次性同步所有的等待協(xié)程,但是還不夠精細(xì)化,比如想通知指定數(shù)量的等待協(xié)程,這個時候Event就無能為力了,所以同步原語中出現(xiàn)了Condition。
# test7.py
import asyncio
from datetime import datetime
import aiohttp
cond = asyncio.Condition()
async def waiter(id):
async with cond:
await cond.wait()
print(f"{datetime.now()} waiter[{id}]等待完成")
async def run(sess: aiohttp.ClientSession, url: str, id: int):
async with sess.get(url) as resp:
ret = await resp.json()
print("響應(yīng)內(nèi)容", ret)
data = ret["msg"]
async with cond:
# cond.notify()
# cond.notify_all()
cond.notify(2)
return data
async def main(workers: int, url: str):
for i in range(workers):
asyncio.ensure_future(waiter(i))
async with aiohttp.ClientSession() as sess:
futures = []
for i in range(workers):
futures.append(asyncio.ensure_future(run(sess, url, i)))
await asyncio.wait(futures)
# 等到其他pending可馬上運行完成的任務(wù)運行結(jié)束
await asyncio.sleep(0)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))
end = datetime.now()
print("耗時:", end - start)
輸出如下:
$ python test7.py
響應(yīng)內(nèi)容 {'msg': 0.587516452693613}
2022-07-11 10:26:13.482781 waiter[0]等待完成
2022-07-11 10:26:13.483778 waiter[1]等待完成
響應(yīng)內(nèi)容 {'msg': 0.3391774763719556}
響應(yīng)內(nèi)容 {'msg': 0.2653464378663153}
2022-07-11 10:26:13.484771 waiter[2]等待完成
耗時: 0:00:01.013655
可以看到,前面兩個等待的協(xié)程是在同一時刻完成,而不是全部等待完成。
信號量 Semaphore
通過創(chuàng)建協(xié)程的數(shù)量來控制并發(fā)并不是非常優(yōu)雅的方式,所以可以通過信號量的方式來控制并發(fā)。
# test8.py
import asyncio
from datetime import datetime
import aiohttp
semp = asyncio.Semaphore(2)
async def run(sess: aiohttp.ClientSession, url: str, id: int):
async with semp:
async with sess.get(url) as resp:
ret = await resp.json()
print(f"{datetime.now()} worker[{id}] 響應(yīng)內(nèi)容", ret)
data = ret["msg"]
return data
async def main(workers: int, url: str):
async with aiohttp.ClientSession() as sess:
futures = []
for i in range(workers):
futures.append(asyncio.ensure_future(run(sess, url, i)))
await asyncio.wait(futures)
# 等到其他pending可馬上運行完成的任務(wù)運行結(jié)束
await asyncio.sleep(0)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))
end = datetime.now()
print("耗時:", end - start)
輸出如下:
$ python test8.py
2022-07-11 10:30:40.634801 worker[0] 響應(yīng)內(nèi)容 {'msg': 0.21337652123021056}
2022-07-11 10:30:40.634801 worker[1] 響應(yīng)內(nèi)容 {'msg': 0.7591980200967501}
2022-07-11 10:30:41.636346 worker[2] 響應(yīng)內(nèi)容 {'msg': 0.8282581038608438}
耗時: 0:00:02.011661
可以發(fā)現(xiàn),雖然同時創(chuàng)建了三個協(xié)程,但是同一時刻只有兩個協(xié)程工作,而另外一個協(xié)程需要等待一個協(xié)程讓出信號量才能運行。
小結(jié)
無論是協(xié)程還是線程,任務(wù)之間的狀態(tài)同步還是很重要的,所以有了應(yīng)對各種同步機制的同步原語,因為要保證一個資源同一個時刻只能一個任務(wù)訪問,所以引入了鎖,又因為需要一個任務(wù)等待另一個任務(wù),或者多個任務(wù)等待某個任務(wù),因此引入了事件(Event),但是為了更精細(xì)的控制通知的程度,所以又引入了條件(Condition), 通過條件可以控制一次通知多少的任務(wù)。
有時候的并發(fā)需求是通過一個變量控制并發(fā)任務(wù)的并發(fā)數(shù)而不是通過創(chuàng)建協(xié)程的數(shù)量來控制并發(fā),所以引入了信號量(Semaphore),這樣就可以在創(chuàng)建的協(xié)程數(shù)遠遠大于并發(fā)數(shù)的情況下讓協(xié)程在指定的并發(fā)量情況下并發(fā)。
兼容多線程,多進程
不得不承認(rèn)異步編程相比起同步編程的生態(tài)要小的很多,所以不可能完全異步編程,因此需要一種方式兼容。
多線程是為了兼容同步得代碼。
多進程是為了利用CPU多核的能力。
# test9.py
import time
from datetime import datetime
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
semp = asyncio.Semaphore(2)
def wait_io(id: int):
# 為了簡單起見,直接使用sleep模擬io
time.sleep(1)
return f"threading({id}): done at {datetime.now()}"
def more_cpu(id: int):
sum(i * i for i in range(10 ** 7))
return f"process({id}): done at {datetime.now()}"
async def main(workers: int):
loop = asyncio.get_event_loop()
futures = []
thread_pool = ThreadPoolExecutor(workers+1)
process_pool = ProcessPoolExecutor(workers)
ret = loop.run_in_executor(thread_pool, wait_io, 0, )
for i in range(workers):
futures.append(loop.run_in_executor(thread_pool, wait_io, i))
for i in range(workers):
futures.append(loop.run_in_executor(process_pool, more_cpu, i))
print("n".join(await asyncio.gather(*futures)))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start = datetime.now()
loop.run_until_complete(main(3))
end = datetime.now()
print("耗時:", end - start)
輸出如下:
threading(0): done at 2022-07-11 15:38:36.073547
threading(1): done at 2022-07-11 15:38:36.074540
threading(2): done at 2022-07-11 15:38:36.074540
process(0): done at 2022-07-11 15:38:36.142233
process(1): done at 2022-07-11 15:38:36.177190
process(2): done at 2022-07-11 15:38:36.162244
耗時: 0:00:01.107643
可以看到總耗時1秒,說明所有的線程跟進程是同時運行的。
生態(tài)
下面是本人使用過的一些異步庫,僅供參考
web框架
- fastapi 超級棒的web框架,使用過就不再想使用其他的了
http客戶端
- httpie
- aiohttp
數(shù)據(jù)庫
- aioredis redis異步庫
- motor mongodb異步庫
ORM
- sqlmodel 超級棒的ORM
雖然異步庫發(fā)展得還算不錯,但是中肯的說并沒有覆蓋方方面面。
總結(jié)
雖然我鼓勵大家嘗試異步編程,但是本文的最后卻是讓大家謹(jǐn)慎的選擇開發(fā)環(huán)境,如果你覺得本文的并發(fā),同步,兼容多線程,多進程不值得一提,那么我十分推薦你嘗試以異步編程的方式開始一個新的項目,如果你對其中一些還有疑問或者你確定了要使用的依賴庫并且大多數(shù)是沒有異步庫替代的,那么我還是建議你直接按照自己擅長的同步編程開始。
異步編程雖然很不錯,不過,也許你并不需要。