1、簡介
我們將討論如何利用Python/ target=_blank class=infotextkey>Python執行多線程和多進程任務。它們提供了在單個進程或多個進程之間執行并發操作的方法。并行和并發執行可以提高系統的速度和效率。
在討論多線程和多進程的基礎知識之后,我們還將討論使用Python庫實現它們的實際方法。
首先簡要討論并行系統的好處。
- 改進的性能:有了并發執行任務的能力,可以減少執行時間并提高系統的整體性能。
- 可擴展性:可以將一個大任務分解為多個較小的子任務,并為它們分配獨立的核心或線程,讓它們獨立執行。這在大規模系統中非常有用。
- 高效的I/O操作:通過并發的幫助,CPU不必等待進程完成其I/O操作。CPU可以立即開始執行下一個進程,直到前一個進程忙于其I/O操作。
- 資源優化:通過分割資源,可以防止單個進程占用所有資源。這可以避免較小進程的Starvation(饑餓)問題。
并行計算的優勢
以上是需要并發或并行執行的一些常見原因。現在,回到主題,即多線程和多進程,并討論它們的主要區別。
2、什么是多線程?
多線程是在單個進程中實現并行性的一種方法,能夠執行同時進行的任務。在單個進程內可以創建多個線程,并在該進程內并行執行較小的任務。
單個進程中的線程共享一個公共內存空間,但它們的堆棧跟蹤和寄存器是獨立的。由于共享內存,它們的計算成本較低。
單線程和多線程Env.
Python中的多線程主要用于執行I/O操作,即如果程序的某個部分正在執行I/O操作,則其余程序可以保持響應。然而,在Python的實現中,由于全局解釋器鎖(GIL)的存在,多線程無法實現真正的并行性。
簡而言之,GIL是一個互斥鎖,一次只允許一個線程與Python字節碼交互,即使在多線程模式下,一次也只能有一個線程執行字節碼。
這樣做是為了在CPython中保持線程安全,但它限制了多線程的性能優勢。為了解決這個問題,Python有一個單獨的多進程庫,我們將在之后進行討論。
什么是守護線程?
不斷在后臺運行的線程稱為守護線程。它們的主要工作是支持主線程或非守護線程。守護線程不會阻塞主線程的執行,甚至會在主線程執行完畢后繼續運行。
在Python中,守護線程主要用作垃圾回收器。它會默認銷毀所有無用的對象并釋放內存,以便主線程可以正常使用和執行。
3、什么是多進程?
多進程用于執行多個進程的并行執行。它可以幫助實現真正的并行性,因為可以同時執行不同的進程,并且每個進程都擁有自己的內存空間。它使用CPU的獨立核心,并且在執行進程間的數據交換時也很有幫助。
與多線程相比,多進程的計算成本更高,因為不使用共享內存空間。不過,它允許進行獨立執行,并克服了全局解釋器鎖的限制。
多進程環境
上圖展示了一個多進程環境,在該環境中,一個主進程創建了兩個獨立的進程,并為它們分配了不同的工作。
4、多線程實現
現在,我們使用Python實現一個基本的多線程示例。Python有一個內置的threading模塊用于多線程實現。
- 導入庫:
import threading
import os
- 計算平方的函數:
這是一個用于計算數字平方的簡單函數,它接受一個數字列表作為輸入,并輸出列表中每個數字的平方,同時輸出使用的線程名稱和與該線程關聯的進程ID。
def calculate_squares(numbers):
for num in numbers:
square = num * num
print(
f"Square of the number {num} is {square} | Thread Name {threading.current_thread().name} | PID of the process {os.getpid()}"
)
- 主函數:
本示例有一個數字列表,將其平均分成兩半,并分別命名為first_half和second_half。現在,將為這些列表分配兩個獨立的線程t1和t2。
Thread函數創建一個新線程,該線程接受一個帶有參數列表的函數作為輸入。還可以為線程分配一個單獨的名稱。
.start()函數將開始執行這些線程,而.join()函數將阻塞主線程的執行,直到給定的線程完全執行完畢。
if __name__ == "__mAIn__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
half = len(numbers) // 2
first_half = numbers[:half]
second_half = numbers[half:]
t1 = threading.Thread(target=calculate_squares, name="t1", args=(first_half,))
t2 = threading.Thread(target=calculate_squares, name="t2", args=(second_half,))
t1.start()
t2.start()
t1.join()
t2.join()
輸出:
Square of the number 1 is 1 | Thread Name t1 | PID of the process 345
Square of the number 2 is 4 | Thread Name t1 | PID of the process 345
Square of the number 5 is 25 | Thread Name t2 | PID of the process 345
Square of the number 3 is 9 | Thread Name t1 | PID of the process 345
Square of the number 6 is 36 | Thread Name t2 | PID of the process 345
Square of the number 4 is 16 | Thread Name t1 | PID of the process 345
Square of the number 7 is 49 | Thread Name t2 | PID of the process 345
Square of the number 8 is 64 | Thread Name t2 | PID of the process 345
注意:上述創建的所有線程都是非守護線程。要創建守護線程,需要編寫t1.setDaemon(True),將線程t1設置為守護線程。
現在來了解一下上述代碼生成的輸出結果。可以觀察到兩個線程的進程ID(即PID)保持不變,這意味著這兩個線程屬于同一個進程。
還可以觀察到輸出并非按順序生成。第一行中可以看到是線程1生成的輸出,然后在第三行是線程2生成的輸出,接著在第四行,再次是線程1生成的輸出。這清楚地表明這些線程是同時工作的。
并發并不意味著這兩個線程并行執行,因為一次只有一個線程被執行。它不會減少執行時間,與順序執行所需的時間相同。CPU開始執行一個線程,但在中途離開,并切換到另一個線程,過一段時間后,又回到主線程,并從上次離開的地方開始執行。
5、多進程實現
目前對多線程及其實現方式和限制已經有基本的了解。現在,是時候學習多進程的實現以及如何克服這些限制了。
在這里將沿用相同的示例,但不再創建兩個獨立的線程,而是創建兩個獨立的進程,并討論觀察結果。
- 導入庫:
from multiprocessing import Process
import os
本例將使用multiprocessing模塊來創建獨立的進程。
- 計算平方的函數:
該函數將保持不變。只是在這里刪除了有關線程信息的打印語句。
def calculate_squares(numbers):
for num in numbers:
square = num * num
print(
f"Square of the number {num} is {square} | PID of the process {os.getpid()}"
)
- 主函數:
主函數有一些修改。只是創建了一個獨立的進程,而不是線程。
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
half = len(numbers) // 2
first_half = numbers[:half]
second_half = numbers[half:]
p1 = Process(target=calculate_squares, args=(first_half,))
p2 = Process(target=calculate_squares, args=(second_half,))
p1.start()
p2.start()
p1.join()
p2.join()
輸出:
Square of the number 1 is 1 | PID of the process 1125
Square of the number 2 is 4 | PID of the process 1125
Square of the number 3 is 9 | PID of the process 1125
Square of the number 4 is 16 | PID of the process 1125
Square of the number 5 is 25 | PID of the process 1126
Square of the number 6 is 36 | PID of the process 1126
Square of the number 7 is 49 | PID of the process 1126
Square of the number 8 is 64 | PID of the process 1126
可以觀察到,每個列表都由一個獨立的進程執行。它們具有不同的進程ID。為了檢查進程是否已并行執行,需要創建一個單獨的環境,下面我們將討論這一點。
計算是否使用多進程的運行時間
為了檢查是否獲得了真正的并行性,在這里將計算使用和不使用多進程的算法運行時間。
為此,需要一個包含超過10^6個整數的大型整數列表。可以使用random庫生成一個列表。此處將使用Python的time模塊來計算運行時間。下面是實現的代碼,代碼本身很容易理解,也可以隨時查看代碼注釋。
from multiprocessing import Process
import os
import time
import random
def calculate_squares(numbers):
for num in numbers:
square = num * num
if __name__ == "__main__":
numbers = [
random.randrange(1, 50, 1) for i in range(10000000)
] # 創建一個包含10^7個整數的隨機列表。
half = len(numbers) // 2
first_half = numbers[:half]
second_half = numbers[half:]
# ----------------- 創建單進程環境 ------------------------#
start_time = time.time() # 開始計時(不使用多進程)
p1 = Process(
target=calculate_squares, args=(numbers,)
) # 單進程P1執行整個列表
p1.start()
p1.join()
end_time = time.time() # 結束計時(不使用多進程)
print(f"Execution Time Without Multiprocessing: {(end_time-start_time)*10**3}ms")
# ----------------- 創建多進程環境 ------------------------#
start_time = time.time() # 開始計時(使用多進程)
p2 = Process(target=calculate_squares, args=(first_half,))
p3 = Process(target=calculate_squares, args=(second_half,))
p2.start()
p3.start()
p2.join()
p3.join()
end_time = time.time() # 結束計時(使用多進程)
print(f"Execution Time With Multiprocessing: {(end_time-start_time)*10**3}ms")
輸出:
Execution Time Without Multiprocessing: 619.8039054870605ms
Execution Time With Multiprocessing: 321.70287895202637ms
可以觀察到,使用多進程的時間幾乎是不使用多進程時間的一半。這表明這兩個進程在同一時間內并行執行,并展示了真正的并行性行為。