相關概念
并發和并行
- 并發:指一個時間段內,在一個CPU(CPU核心)能運行的程序的數量。
- 并行:指在同一時刻,在多個CPU上運行多個程序,跟CPU(CPU核心)數量有關。
因為
計算機CPU(CPU核心)在同一時刻只能運行一個程序。
同步和異步
- 同步是指代碼調用的時候必須等待執行完成才能執行剩余的邏輯。
- 異步是指代碼在調用的時候,不用等待操作完成,直接執行剩余邏輯。
阻塞和非阻塞
- 阻塞是指調用函數的時候當前線程被掛起。
- 非阻塞是指調用函數時當前線程不會被掛起,而是立即返回。
CPU密集型和I/O密集型
CPU密集型(CPU-bound):
CPU密集型又叫做計算密集型,指I/O在很短時間就能完成,CPU需要大量的計算和處理,特點是CPU占用高。
例如:壓縮解壓縮、加密解密、正則表達式搜索。
IO密集型(I/O-bound):
IO密集型是指系統運行時大部分時間時CPU在等待IO操作(硬盤/內存)的讀寫操作,特點是CPU占用較低。
例如:文件讀寫、網絡爬蟲、數據庫讀寫。
多進程、多線程、多協程的對比
類型 |
優點 |
缺點 |
適用 |
多進程 Process(multiprocessing) |
可以利用CPU多核并行運算 |
占用資源最多 可啟動數目比線程少 |
CPU密集型計算 |
多線程 Thread(threading) |
相比進程更輕量占用資源少 |
相比進程,多線程只能并發執行,不能利用多CPU(GIL) 相比協程啟動數目有限制,占用內存資源有線程切換開銷 |
IO密集型計算、同時運行的任務要求不多 |
多協程 Coroutine(asyncio) |
內存開銷最少,啟動協程數量最多 |
支持庫的限制 代碼實現復雜 |
IO密集型計算、同時運行的較多任務 |
GIL全稱Global Interpreter Lock
下圖為GIL的運行
Python/ target=_blank class=infotextkey>Python的多線程是偽多線程,同時只能有一個線程運行。
一個進程能夠啟動N個線程,數量受系統限制。
一個線程能夠啟動N個協程,數量不受限制。
怎么選擇
對于其他語言來說,多線程是能同時利用多CPU(核)的,所以是適用CPU密集型計算的,但是Python由于GIL的限制,只能使用IO密集型計算。
所以對于Python來說:
對于IO密集型來說能用多協程就用多協程,沒有庫支持才用多線程。
對于CPU密集型就只能用多進程了。
協程(異步IO)
簡單示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import asyncio
async def test():
await asyncio.sleep(3)
return "123"
async def main():
result = await test()
print(result)
if __name__ == '__main__':
asyncio.run(main())
|
單次請求查看結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import threading
import asyncio
async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
return index
def getfuture(future):
print(f"結果為:{future.result()}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(myfun(1))
future.add_done_callback(getfuture)
loop.run_until_complete(future)
loop.close()
|
或者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import threading
import asyncio
async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
return index
if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(myfun(1))
loop.run_until_complete(future)
print(f"結果為:{future.result()}")
loop.close()
|
多次請求查看結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import threading
import asyncio
async def myfun(index):
print(f'線程({threading.currentThread().name}) 傳入參數({index})')
await asyncio.sleep(1)
return index
loop = asyncio.get_event_loop()
future_list = []
for item in range(3):
future = asyncio.ensure_future(myfun(item))
future_list.Append(future)
loop.run_until_complete(asyncio.wait(future_list))
for future in future_list:
print(f"結果為:{future.result()}")
loop.close()
|
asyncio.wait和asyncio.gather
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import threading
import asyncio
async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
tasks = [myfun(1), myfun(2)]
loop.run_until_complete(asyncio.wait(tasks))
#loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
|
asyncio.gather 和asyncio.wait區別:
在內部wait()使用一個set保存它創建的Task實例。因為set是無序的所以這也就是我們的任務不是順序執行的原因。wait的返回值是一個元組,包括兩個集合,分別表示已完成和未完成的任務。wait第二個參數為一個超時值
達到這個超時時間后,未完成的任務狀態變為pending,當程序退出時還有任務沒有完成此時就會看到如下的錯誤提示。
gather的使用
gather的作用和wait類似不同的是。
- gather任務無法取消。
- 返回值是一個結果列表
- 可以按照傳入參數的 順序,順序輸出。
協程和多線程結合
同時多個請求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import requests
def myquery(url):
r = requests.get(url)
print(r.text)
return r.text
if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]
tasks = []
start_time = time.time()
for url in urls:
task = loop.run_in_executor(executor, myquery, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print(f"用時{time.time() - start_time}")
|
結果
1
2
3
|
{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}
{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小紅","sex":"女"},{"name":"小剛","sex":"未知"}]}
用時0.11207175254821777
|
單個請求添加回調
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import requests
def myquery(url):
print(f"請求所在線程:{threading.current_thread().name}")
r = requests.get(url)
return r.text
def myfuture(future):
print(f"回調所在線程:{threading.current_thread().name}")
print(future.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
url = "https://www.psvmc.cn/userlist.json"
tasks = []
start_time = time.time()
task = loop.run_in_executor(executor, myquery, url)
future = asyncio.ensure_future(task)
future.add_done_callback(myfuture)
loop.run_until_complete(future)
print(f"用時{time.time() - start_time}")
|
多線程與多進程
多線程
引用模塊
1
2
3
4
5
6
7
8
|
from threading import Thread
def func(num):
return num
t = Thread(target=func, args=(100,))
t.start()
t.join()
|
數據通信
1
2
3
4
5
|
import queue
q = queue.Queue()
q.put(1)
item = q.get()
|
鎖
1
2
3
4
5
|
from threading import Lock
lock = Lock()
with lock:
pass
|
池化技術
1
2
3
4
5
6
7
8
|
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1, 2, 3])
# 方法2
future = executor.submit(func, 1)
result = future.result()
|
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個準備作為線程任務的函數
def action(num):
print(threading.current_thread().name)
time.sleep(num)
return num + 100
if __name__ == "__main__":
# 創建一個包含3條線程的線程池
with ThreadPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(action, 3)
future1.result()
print(f"單個任務返回:{future1.result()}")
print('------------------------------')
# 使用線程執行map計算
results = pool.map(action, (1, 3, 5))
for r in results:
print(f"多個任務返回:{r}")
|
結果
1
2
3
4
5
6
7
8
9
|
ThreadPoolExecutor-0_0
單個任務返回:103
------------------------------
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_2
多個任務返回:101
多個任務返回:103
多個任務返回:105
|
多進程
引用模塊
1
2
3
4
5
6
7
8
|
from multiprocessing import Process
def func(num):
return num
t = Process(target=func, args=(100,))
t.start()
t.join()
|
數據通信
1
2
3
4
|
import multiprocessing
q = multiprocessing.Queue()
q.put(1)
item = q.get()
|
鎖
1
2
3
4
5
|
from multiprocessing import Lock
lock = Lock()
with lock:
pass
|
池化技術
1
2
3
4
5
6
7
8
|
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1, 2, 3])
# 方法2
future = executor.submit(func, 1)
result = future.result()
|
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time
# 定義一個準備作為進程任務的函數
def action(num):
print(multiprocessing.current_process().name)
time.sleep(num)
return num + 100
if __name__ == "__main__":
# 創建一個包含3條進程的進程池
with ProcessPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(action, 3)
future1.result()
print(f"單個任務返回:{future1.result()}")
print('------------------------------')
# 使用線程執行map計算
results = pool.map(action, [1, 3, 5])
for r in results:
print(f"多個任務返回:{r}")
|
結果
1
2
3
4
5
6
7
8
9
|
SpawnProcess-1
單個任務返回:103
------------------------------
SpawnProcess-2
SpawnProcess-3
SpawnProcess-1
多個任務返回:101
多個任務返回:103
多個任務返回:105
|
多進程/多線程/協程對比
異步 IO(asyncio)、多進程(multiprocessing)、多線程(multithreading)
IO 密集型應用CPU等待IO時間遠大于CPU 自身運行時間,太浪費;
常見的 IO 密集型業務包括:瀏覽器交互、磁盤請求、網絡爬蟲、數據庫請求等
Python 世界對于 IO 密集型場景的并發提升有 3 種方法:多進程、多線程、多協程;
理論上講asyncio是性能最高的,原因如下:
- 進程、線程會有CPU上下文切換
- 進程、線程需要內核態和用戶態的交互,性能開銷大;而協程對內核透明的,只在用戶態運行
- 進程、線程并不可以無限創建,最佳實踐一般是 CPU*2;而協程并發能力強,并發上限理論上取決于操作系統IO多路復用(linux下是 epoll)可注冊的文件描述符的極限
那asyncio的實際表現是否如理論上那么強,到底強多少呢?我構建了如下測試場景:
請求10此,并sleep 1s模擬業務查詢
- 方法 1;順序串行執行
- 方法 2:多進程
- 方法 3:多線程
- 方法 4:asyncio
- 方法 5:asyncio+uvloop
最后的asyncio+uvloop和官方asyncio 最大不同是用 Cython+libuv 重新實現了asyncio 的事件循環(event loop)部分,
官方測試性能是 node.js的 2 倍,持平 golang。
順序串行執行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import time
def query(num):
print(num)
time.sleep(1)
def main():
for h in range(10):
query(h)
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"時間差:{end_time-start_time}")
|
多進程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from concurrent import futures
import time
def query(num):
print(num)
time.sleep(1)
def main():
with futures.ProcessPoolExecutor() as executor:
for future in executor.map(query, range(10)):
pass
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"時間差:{end_time-start_time}")
|
多線程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from concurrent import futures
import time
def query(num):
print(num)
time.sleep(1)
def main():
with futures.ThreadPoolExecutor() as executor:
for future in executor.map(query, range(10)):
pass
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"時間差:{end_time-start_time}")
|
asyncio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import asyncio
import time
async def query(num):
print(num)
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(query(num)) for num in range(10)]
await asyncio.gather(*tasks)
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
print(f"時間差:{end_time-start_time}")
|
asyncio+uvloop
注意
windows上不支持uvloop。
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import asyncio
import uvloop
import time
async def query(num):
print(num)
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(query(host)) for host in range(10)]
await asyncio.gather(*tasks)
# main entrance
if __name__ == '__main__':
uvloop.install()
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
print(f"時間差:{end_time-start_time}")
|
運行時間對比
方式 |
運行時間 |
串行 |
10.0750972s |
多進程 |
1.1638731999999998s |
多線程 |
1.0146456s |
asyncio |
1.0110082s |
asyncio+uvloop |
1.01s |
可以看出: 無論多進程、多線程還是asyncio都能大幅提升IO 密集型場景下的并發,但asyncio+uvloop性能最高!
原文鏈接:
https://www.psvmc.cn/article/2021-11-24-python-async.html