日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

相關概念

Python語法-多進程、多線程、協程(異步IO)

 

并發和并行

  • 并發:指一個時間段內,在一個CPU(CPU核心)能運行的程序的數量。
  • 并行:指在同一時刻,在多個CPU上運行多個程序,跟CPU(CPU核心)數量有關。

因為

計算機CPU(CPU核心)在同一時刻只能運行一個程序。

同步和異步

  • 同步是指代碼調用的時候必須等待執行完成才能執行剩余的邏輯。
  • 異步是指代碼在調用的時候,不用等待操作完成,直接執行剩余邏輯。

阻塞和非阻塞

  • 阻塞是指調用函數的時候當前線程被掛起。
  • 非阻塞是指調用函數時當前線程不會被掛起,而是立即返回。

CPU密集型和I/O密集型

CPU密集型(CPU-bound):

CPU密集型又叫做計算密集型,指I/O在很短時間就能完成,CPU需要大量的計算和處理,特點是CPU占用高。

例如:壓縮解壓縮、加密解密、正則表達式搜索。

IO密集型(I/O-bound):

IO密集型是指系統運行時大部分時間時CPU在等待IO操作(硬盤/內存)的讀寫操作,特點是CPU占用較低。

例如:文件讀寫、網絡爬蟲、數據庫讀寫。

多進程、多線程、多協程的對比

類型

優點

缺點

適用

多進程
Process(multiprocessing)

可以利用CPU多核并行運算

占用資源最多
可啟動數目比線程少

CPU密集型計算

多線程
Thread(threading)

相比進程更輕量占用資源少

相比進程,多線程只能并發執行,不能利用多CPU(GIL)
相比協程啟動數目有限制,占用內存資源有線程切換開銷

IO密集型計算、同時運行的任務要求不多

多協程
Coroutine(asyncio)

內存開銷最少,啟動協程數量最多

支持庫的限制
代碼實現復雜

IO密集型計算、同時運行的較多任務

GIL全稱Global Interpreter Lock

下圖為GIL的運行

Python語法-多進程、多線程、協程(異步IO)

 

Python/ target=_blank class=infotextkey>Python的多線程是偽多線程,同時只能有一個線程運行。

一個進程能夠啟動N個線程,數量受系統限制。

一個線程能夠啟動N個協程,數量不受限制。

怎么選擇

對于其他語言來說,多線程是能同時利用多CPU(核)的,所以是適用CPU密集型計算的,但是Python由于GIL的限制,只能使用IO密集型計算。

所以對于Python來說:

對于IO密集型來說能用多協程就用多協程,沒有庫支持才用多線程。

對于CPU密集型就只能用多進程了。

Python語法-多進程、多線程、協程(異步IO)

 

協程(異步IO)

簡單示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio


async def test():
    await asyncio.sleep(3)
    return "123"


async def main():
    result = await test()
    print(result)


if __name__ == '__main__':
    asyncio.run(main())

 

單次請求查看結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)
    return index


def getfuture(future):
    print(f"結果為:{future.result()}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(myfun(1))
    future.add_done_callback(getfuture)
    loop.run_until_complete(future)
    loop.close()

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)
    return index

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(myfun(1))
    loop.run_until_complete(future)
    print(f"結果為:{future.result()}")
    loop.close()

 

多次請求查看結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import asyncio


async def myfun(index):
    print(f'線程({threading.currentThread().name}) 傳入參數({index})')
    await asyncio.sleep(1)
    return index

loop = asyncio.get_event_loop()
future_list = []
for item in range(3):
    future = asyncio.ensure_future(myfun(item))
    future_list.Append(future)
loop.run_until_complete(asyncio.wait(future_list))
for future in future_list:
    print(f"結果為:{future.result()}")
loop.close()

 

asyncio.wait和asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)


loop = asyncio.get_event_loop()
tasks = [myfun(1), myfun(2)]
loop.run_until_complete(asyncio.wait(tasks))
#loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

asyncio.gather 和asyncio.wait區別:

在內部wait()使用一個set保存它創建的Task實例。因為set是無序的所以這也就是我們的任務不是順序執行的原因。wait的返回值是一個元組,包括兩個集合,分別表示已完成和未完成的任務。wait第二個參數為一個超時值
達到這個超時時間后,未完成的任務狀態變為pending,當程序退出時還有任務沒有完成此時就會看到如下的錯誤提示。

gather的使用
gather的作用和wait類似不同的是。

  1. gather任務無法取消。
  2. 返回值是一個結果列表
  3. 可以按照傳入參數的 順序,順序輸出。
Python語法-多進程、多線程、協程(異步IO)

 

協程和多線程結合

同時多個請求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
    r = requests.get(url)
    print(r.text)
    return r.text


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]
    tasks = []
    start_time = time.time()
    for url in urls:
        task = loop.run_in_executor(executor, myquery, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print(f"用時{time.time() - start_time}")

結果

1
2
3
{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}
{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小紅","sex":"女"},{"name":"小剛","sex":"未知"}]}
用時0.11207175254821777

 

單個請求添加回調

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
    print(f"請求所在線程:{threading.current_thread().name}")
    r = requests.get(url)
    return r.text


def myfuture(future):
    print(f"回調所在線程:{threading.current_thread().name}")
    print(future.result())


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    url = "https://www.psvmc.cn/userlist.json"
    tasks = []
    start_time = time.time()
    task = loop.run_in_executor(executor, myquery, url)
    future = asyncio.ensure_future(task)
    future.add_done_callback(myfuture)
    loop.run_until_complete(future)
    print(f"用時{time.time() - start_time}")

 

多線程與多進程

多線程

引用模塊

1
2
3
4
5
6
7
8
from threading import Thread

def func(num):
    return num

t = Thread(target=func, args=(100,))
t.start()
t.join()

數據通信

1
2
3
4
5
import queue

q = queue.Queue()
q.put(1)
item = q.get()

1
2
3
4
5
from threading import Lock

lock = Lock()
with lock:
    pass

 

池化技術

1
2
3
4
5
6
7
8
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1, 2, 3])
    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor
import threading
import time


# 定義一個準備作為線程任務的函數
def action(num):
    print(threading.current_thread().name)
    time.sleep(num)
    return num + 100


if __name__ == "__main__":
    # 創建一個包含3條線程的線程池
    with ThreadPoolExecutor(max_workers=3) as pool:
        future1 = pool.submit(action, 3)

        future1.result()
        print(f"單個任務返回:{future1.result()}")

        print('------------------------------')
        # 使用線程執行map計算
        results = pool.map(action, (1, 3, 5))
        for r in results:
            print(f"多個任務返回:{r}")

結果

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor-0_0
單個任務返回:103
------------------------------
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_2
多個任務返回:101
多個任務返回:103
多個任務返回:105

 

多進程

引用模塊

1
2
3
4
5
6
7
8
from multiprocessing import Process

def func(num):
    return num

t = Process(target=func, args=(100,))
t.start()
t.join()

 

數據通信

1
2
3
4
import multiprocessing
q = multiprocessing.Queue()
q.put(1)
item = q.get()

 

1
2
3
4
5
from multiprocessing import Lock

lock = Lock()
with lock:
    pass

 

池化技術

1
2
3
4
5
6
7
8
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1, 2, 3])
    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

 

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time


# 定義一個準備作為進程任務的函數
def action(num):
    print(multiprocessing.current_process().name)
    time.sleep(num)
    return num + 100


if __name__ == "__main__":
    # 創建一個包含3條進程的進程池
    with ProcessPoolExecutor(max_workers=3) as pool:
        future1 = pool.submit(action, 3)

        future1.result()
        print(f"單個任務返回:{future1.result()}")

        print('------------------------------')
        # 使用線程執行map計算
        results = pool.map(action, [1, 3, 5])
        for r in results:
            print(f"多個任務返回:{r}")

結果

1
2
3
4
5
6
7
8
9
SpawnProcess-1
單個任務返回:103
------------------------------
SpawnProcess-2
SpawnProcess-3
SpawnProcess-1
多個任務返回:101
多個任務返回:103
多個任務返回:105

 

多進程/多線程/協程對比

異步 IO(asyncio)、多進程(multiprocessing)、多線程(multithreading)

IO 密集型應用CPU等待IO時間遠大于CPU 自身運行時間,太浪費;

常見的 IO 密集型業務包括:瀏覽器交互、磁盤請求、網絡爬蟲、數據庫請求等

Python 世界對于 IO 密集型場景的并發提升有 3 種方法:多進程、多線程、多協程;

理論上講asyncio是性能最高的,原因如下:

  1. 進程、線程會有CPU上下文切換
  2. 進程、線程需要內核態和用戶態的交互,性能開銷大;而協程對內核透明的,只在用戶態運行
  3. 進程、線程并不可以無限創建,最佳實踐一般是 CPU*2;而協程并發能力強,并發上限理論上取決于操作系統IO多路復用(linux下是 epoll)可注冊的文件描述符的極限

那asyncio的實際表現是否如理論上那么強,到底強多少呢?我構建了如下測試場景:

請求10此,并sleep 1s模擬業務查詢

  • 方法 1;順序串行執行
  • 方法 2:多進程
  • 方法 3:多線程
  • 方法 4:asyncio
  • 方法 5:asyncio+uvloop

最后的asyncio+uvloop和官方asyncio 最大不同是用 Cython+libuv 重新實現了asyncio 的事件循環(event loop)部分,

官方測試性能是 node.js的 2 倍,持平 golang。

Python語法-多進程、多線程、協程(異步IO)

 

順序串行執行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    for h in range(10):
        query(h)


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"時間差:{end_time-start_time}")

 

多進程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    with futures.ProcessPoolExecutor() as executor:
        for future in executor.map(query, range(10)):
            pass


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"時間差:{end_time-start_time}")

 

多線程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    with futures.ThreadPoolExecutor() as executor:
        for future in executor.map(query, range(10)):
            pass


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"時間差:{end_time-start_time}")

 

asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def query(num):
    print(num)
    await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(query(num)) for num in range(10)]
    await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    end_time = time.perf_counter()
    print(f"時間差:{end_time-start_time}")

 

asyncio+uvloop

注意

windows上不支持uvloop。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import uvloop
import time


async def query(num):
    print(num)
    await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(query(host)) for host in range(10)]
    await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
    uvloop.install()
    start_time = time.perf_counter()
    asyncio.run(main())
    end_time = time.perf_counter()
    print(f"時間差:{end_time-start_time}")

 

運行時間對比

方式

運行時間

串行

10.0750972s

多進程

1.1638731999999998s

多線程

1.0146456s

asyncio

1.0110082s

asyncio+uvloop

1.01s

 


可以看出: 無論多進程、多線程還是asyncio都能大幅提升IO 密集型場景下的并發,但asyncio+uvloop性能最高!

Python語法-多進程、多線程、協程(異步IO)

 

原文鏈接:
https://www.psvmc.cn/article/2021-11-24-python-async.html

分享到:
標簽:語法 Python
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定