上篇文章講了部分線程間通信的方式,這篇文章再講3種方式,同時(shí)講4中進(jìn)程間通信的方式
一、 Python/ target=_blank class=infotextkey>Python 中線程間通信的實(shí)現(xiàn)方式
共享變量
共享變量是多個(gè)線程可以共同訪問的變量。在Python中,可以使用threading模塊中的Lock對(duì)象來(lái)確保線程安全,避免多個(gè)線程同時(shí)訪問同一個(gè)變量而導(dǎo)致的數(shù)據(jù)競(jìng)爭(zhēng)問題。
下面是一個(gè)使用共享變量進(jìn)行線程間通信的示例代碼:
import threading
# 共享變量
count = 0
lock = threading.Lock()
# 線程函數(shù)
def increment():
global count
for i in range(1000000):
lock.acquire()
count += 1
lock.release()
# 創(chuàng)建線程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
# 啟動(dòng)線程
t1.start()
t2.start()
# 等待線程結(jié)束
t1.join()
t2.join()
# 輸出結(jié)果
print("count = ", count)
在上面的代碼中,我們創(chuàng)建了兩個(gè)線程,它們都會(huì)執(zhí)行increment函數(shù),該函數(shù)會(huì)將count變量增加1000000次。由于多個(gè)線程可能同時(shí)訪問count變量,因此我們需要使用Lock對(duì)象來(lái)確保線程安全。每當(dāng)一個(gè)線程需要訪問count變量時(shí),它必須先獲取鎖,然后執(zhí)行相應(yīng)的操作,最后釋放鎖,以便其他線程可以繼續(xù)訪問count變量。
事件(Event)
事件是一種線程間通信機(jī)制,它可以用于線程之間的通知和等待。一個(gè)線程可以設(shè)置事件,另外一個(gè)線程可以等待該事件的觸發(fā)。
在Python中,可以使用threading模塊中的Event對(duì)象來(lái)實(shí)現(xiàn)事件。Event對(duì)象有兩個(gè)方法:set和wAIt。當(dāng)一個(gè)線程調(diào)用set方法時(shí),它會(huì)將事件設(shè)置為已觸發(fā)狀態(tài),所有等待該事件的線程都會(huì)被喚醒;當(dāng)一個(gè)線程調(diào)用wait方法時(shí),如果事件已經(jīng)被設(shè)置為已觸發(fā)狀態(tài),它會(huì)立即返回;否則,它會(huì)阻塞等待事件的觸發(fā)。
下面是一個(gè)使用事件進(jìn)行線程間通信的示例代碼:
import threading
# 事件對(duì)象
event = threading.Event()
# 線程函數(shù)1
def wait_event():
print("waiting for event...")
event.wait()
print("event has been set.")
# 線程函數(shù)2
def set_event():
print("setting event...")
event.set()
# 創(chuàng)建線程
t1 = threading.Thread(target=wait_event)
t2 = threading.Thread(target=set_event)
# 啟動(dòng)線程
t1.start()
t2.start()
# 等待線程結(jié)束
t1.join()
t2.join()
在上面的代碼中,我們創(chuàng)建了兩個(gè)線程,一個(gè)線程會(huì)等待事件的觸發(fā),另一個(gè)線程會(huì)設(shè)置事件。當(dāng)set_event函數(shù)被調(diào)用時(shí),它會(huì)將事件設(shè)置為已觸發(fā)狀態(tài),然后wait_event函數(shù)會(huì)被喚醒,輸出"event has been set."。在這個(gè)示例中,我們沒有使用Lock對(duì)象來(lái)確保線程安全,因?yàn)槭录?duì)象內(nèi)部已經(jīng)使用了鎖來(lái)實(shí)現(xiàn)線程安全。
queue 模塊中的隊(duì)列
queue 模塊中的隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),用于實(shí)現(xiàn)多個(gè)線程之間的通信。在 Python 中,可以使用 queue 模塊中的 Queue 類來(lái)創(chuàng)建隊(duì)列。
queue 模塊中的隊(duì)列類型分為兩種:內(nèi)存隊(duì)列和文件隊(duì)列。
1. 文件隊(duì)列
文件隊(duì)列是一種使用文件作為隊(duì)列的存儲(chǔ)方式,可以用于在不同計(jì)算機(jī)之間傳輸數(shù)據(jù)。在 Python 中,可以使用 queue 模塊中的 FileQueue 類來(lái)創(chuàng)建文件隊(duì)列。
下面是一個(gè)使用文件隊(duì)列實(shí)現(xiàn)線程間通信的示例:
import queue
import threading
def producer(q):
for i in range(5):
q.put(i)
print(f'Produced {i}')
q.put(None)
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item}')
if __name__ == '__main__':
q = queue.FileQueue('queue.txt')
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
在上述代碼中,創(chuàng)建了兩個(gè)線程 t1 和 t2,t1 向文件隊(duì)列中寫入數(shù)據(jù),t2 從文件隊(duì)列中讀取并打印數(shù)據(jù)。
2. 內(nèi)存隊(duì)列
內(nèi)存隊(duì)列是一種使用內(nèi)存作為隊(duì)列的存儲(chǔ)方式,可以用于在同一臺(tái)計(jì)算機(jī)上的進(jìn)程間通信。在 Python 中,可以使用 queue 模塊中的 Queue 類來(lái)創(chuàng)建內(nèi)存隊(duì)列。
下面是一個(gè)使用內(nèi)存隊(duì)列實(shí)現(xiàn)線程間通信的示例:
import queue
import threading
def producer(q):
for i in range(5):
q.put(i)
print(f'Produced {i}')
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item}')
if __name__ == '__main__':
q = queue.Queue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))
t1.start()
t2.start()
t1.join()
q.put(None)
t2.join()
在上述代碼中,創(chuàng)建了兩個(gè)線程 t1 和 t2,t1 向內(nèi)存隊(duì)列中寫入數(shù)據(jù),t2 從內(nèi)存隊(duì)列中讀取并打印數(shù)據(jù)。
二、Python 中進(jìn)程間通信的實(shí)現(xiàn)方式
在 Python 中,進(jìn)程間通信可以使用多種方式實(shí)現(xiàn),例如:
- 管道(Pipe)
- 隊(duì)列(Queue)
- 共享內(nèi)存(Shared Memory)
- 套接字(Socket)
下面將詳細(xì)介紹這些方式。
管道的使用及其類型
管道是一種基于內(nèi)存的通信機(jī)制,用于實(shí)現(xiàn)兩個(gè)進(jìn)程之間的通信。在 Python 中,可以使用 multiprocessing 模塊中的 Pipe 類來(lái)創(chuàng)建管道。
管道類型分為兩種:匿名管道和命名管道。
1 匿名管道
匿名管道是一種臨時(shí)的管道,沒有名字,只能用于父進(jìn)程和其創(chuàng)建的子進(jìn)程之間的通信。匿名管道是雙向的,可以同時(shí)進(jìn)行讀寫操作。
下面是一個(gè)使用匿名管道實(shí)現(xiàn)進(jìn)程間通信的示例:
import multiprocessing
def sender(conn):
conn.send('Hello, receiver!')
conn.close()
def receiver(conn):
msg = conn.recv()
print(msg)
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向 p2 發(fā)送消息,p2 接收并打印消息。
2 命名管道
命名管道是一種持久的管道,有一個(gè)名字,可以用于任意進(jìn)程之間的通信。在 Python 中,可以使用 os.mkfifo 函數(shù)來(lái)創(chuàng)建命名管道。
下面是一個(gè)使用命名管道實(shí)現(xiàn)進(jìn)程間通信的示例:
import os
fifo_path = 'fifo_test'
def sender():
with open(fifo_path, 'w') as f:
f.write('Hello, receiver!')
def receiver():
with open(fifo_path, 'r') as f:
msg = f.read()
print(msg)
if __name__ == '__main__':
if not os.path.exists(fifo_path):
os.mkfifo(fifo_path)
p1 = multiprocessing.Process(target=sender)
p2 = multiprocessing.Process(target=receiver)
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向命名管道中寫入消息,p2 從命名管道中讀取并打印消息。
multiprocessing 模塊中隊(duì)列的使用及其類型
multiprocessing 模塊中的隊(duì)列是一種多進(jìn)程通信機(jī)制,可以用于實(shí)現(xiàn)多個(gè)進(jìn)程之間的數(shù)據(jù)傳輸。在 Python 中,可以使用 multiprocessing 模塊中的 Queue 類來(lái)創(chuàng)建隊(duì)列。
multiprocessing 模塊中的隊(duì)列類型分為兩種:普通隊(duì)列和優(yōu)先級(jí)隊(duì)列。
普通隊(duì)列
普通隊(duì)列是一種先進(jìn)先出(FIFO)的隊(duì)列,可以用于在同一臺(tái)計(jì)算機(jī)上的進(jìn)程間通信。在 Python 中,可以使用 multiprocessing 模塊中的 Queue 類來(lái)創(chuàng)建普通隊(duì)列。
下面是一個(gè)使用普通隊(duì)列實(shí)現(xiàn)進(jìn)程間通信的示例:
import multiprocessing
def producer(q):
for i in range(5):
q.put(i)
print(f'Produced {i}')
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item}')
if __name__ == '__main__':
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None)
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向普通隊(duì)列中寫入數(shù)據(jù),p2 從普通隊(duì)列中讀取并打印數(shù)據(jù)。
優(yōu)先級(jí)隊(duì)列
優(yōu)先級(jí)隊(duì)列是一種根據(jù)元素優(yōu)先級(jí)排序的隊(duì)列,可以用于在同一臺(tái)計(jì)算機(jī)上的進(jìn)程間通信。在 Python 中,可以使用 multiprocessing 模塊中的 PriorityQueue 類來(lái)創(chuàng)建優(yōu)先級(jí)隊(duì)列。
下面是一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)進(jìn)程間通信的示例:
import multiprocessing
def producer(q):
q.put((1, 'high-priority message'))
q.put((2, 'low-priority message'))
print('Messages sent')
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item[1]} with priority {item[0]}')
if __name__ == '__main__':
q = multiprocessing.PriorityQueue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None)
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向優(yōu)先級(jí)隊(duì)列中寫入數(shù)據(jù),其中一個(gè)消息的優(yōu)先級(jí)高于另一個(gè)消息,p2 從優(yōu)先級(jí)隊(duì)列中讀取并打印數(shù)據(jù)。
以上就是 Python 中文件隊(duì)列、內(nèi)存隊(duì)列、普通隊(duì)列和優(yōu)先級(jí)隊(duì)列在線程和進(jìn)程間通信的方式的完整代碼示例。需要注意的是,在使用隊(duì)列進(jìn)行線程間或進(jìn)程間通信時(shí),需要進(jìn)行同步和互斥操作,以避免數(shù)據(jù)競(jìng)爭(zhēng)和其他并發(fā)問題。因此,在使用隊(duì)列進(jìn)行線程間或進(jìn)程間通信時(shí),需要仔細(xì)設(shè)計(jì)和實(shí)現(xiàn)代碼,確保程序的正確性和穩(wěn)定性。
共享內(nèi)存的使用及其類型
共享內(nèi)存是一種多個(gè)進(jìn)程共享同一塊內(nèi)存的通信機(jī)制,可以用于實(shí)現(xiàn)多個(gè)進(jìn)程之間的高效通信。在 Python 中,可以使用 multiprocessing 模塊中的 Value 和 Array 類來(lái)創(chuàng)建共享內(nèi)存。
共享內(nèi)存類型分為兩種:基本類型和數(shù)組類型。
1 基本類型
基本類型是指 Python 中的基本數(shù)據(jù)類型,例如整數(shù)、浮點(diǎn)數(shù)等。在共享內(nèi)存中,可以使用 Value 類來(lái)創(chuàng)建基本類型的共享內(nèi)存。
下面是一個(gè)使用基本類型共享內(nèi)存實(shí)現(xiàn)進(jìn)程間通信的示例:
import multiprocessing
def sender(value):
value.value = 1
def receiver(value):
print(value.value)
if __name__ == '__main__':
value = multiprocessing.Value('i', 0)
p1 = multiprocessing.Process(target=sender, args=(value,))
p2 = multiprocessing.Process(target=receiver, args=(value,))
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向共享內(nèi)存中寫入整數(shù)值,p2 從共享內(nèi)存中讀取并打印整數(shù)值。
2 數(shù)組類型
數(shù)組類型是指 Python 中的數(shù)組,可以使用 Array 類來(lái)創(chuàng)建數(shù)組類型的共享內(nèi)存。
下面是一個(gè)使用數(shù)組類型共享內(nèi)存實(shí)現(xiàn)進(jìn)程間通信的示例:
import multiprocessing
def sender(arr):
arr[0] = 1
def receiver(arr):
print(arr[:])
if __name__ == '__main__':
arr = multiprocessing.Array('i', range(10))
p1 = multiprocessing.Process(target=sender, args=(arr,))
p2 = multiprocessing.Process(target=receiver, args=(arr,))
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向共享內(nèi)存中寫入整數(shù)數(shù)組,p2 從共享內(nèi)存中讀取并打印整數(shù)數(shù)組。
套接字的使用及其類型
套接字是一種網(wǎng)絡(luò)通信機(jī)制,可以用于不同計(jì)算機(jī)之間的進(jìn)程通信。在 Python 中,可以使用 socket 模塊來(lái)創(chuàng)建套接字。
套接字類型分為兩種:流套接字和數(shù)據(jù)報(bào)套接字。
1 流套接字
流套接字是一種基于 TCP 協(xié)議的套接字,可以實(shí)現(xiàn)可靠的面向連接的數(shù)據(jù)傳輸,適用于大量數(shù)據(jù)傳輸和長(zhǎng)時(shí)間連接。在 Python 中,可以使用 socket 模塊中的 socket 類來(lái)創(chuàng)建流套接字。
下面是一個(gè)使用流套接字實(shí)現(xiàn)進(jìn)程間通信的示例:
import socket
HOST = 'localhost'
PORT = 5000
def sender():
with socket.socket(socket.AF_.NET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall(b'Hello, receiver!')
def receiver():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
conn, addr = s.accept()
with conn:
data = conn.recv(1024)
print(data)
if __name__ == '__main__':
p1 = multiprocessing.Process(target=sender)
p2 = multiprocessing.Process(target=receiver)
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向流套接字中寫入消息,p2 從流套接字中讀取并打印消息。
2 數(shù)據(jù)報(bào)套接字
數(shù)據(jù)報(bào)套接字是一種基于 UDP 協(xié)議的套接字,可以實(shí)現(xiàn)無(wú)連接的數(shù)據(jù)傳輸,適用于少量數(shù)據(jù)傳輸和短時(shí)間連接。在 Python 中,可以使用 socket 模塊中的 socket 類來(lái)創(chuàng)建數(shù)據(jù)報(bào)套接字。
下面是一個(gè)使用數(shù)據(jù)報(bào)套接字實(shí)現(xiàn)進(jìn)程間通信的示例:
import socket
HOST = 'localhost'
PORT = 5000
def sender():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(b'Hello, receiver!', (HOST, PORT))
def receiver():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((HOST, PORT))
data, addr = s.recvfrom(1024)
print(data)
if __name__ == '__main__':
p1 = multiprocessing.Process(target=sender)
p2 = multiprocessing.Process(target=receiver)
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向數(shù)據(jù)報(bào)套接字中寫入消息,p2 從數(shù)據(jù)報(bào)套接字中讀取并打印消息。
手動(dòng)實(shí)現(xiàn)進(jìn)程間通信
除了使用 Python 提供的多進(jìn)程通信機(jī)制之外,還可以手動(dòng)實(shí)現(xiàn)進(jìn)程間通信。在 Python 中,可以使用共享內(nèi)存和信號(hào)量來(lái)手動(dòng)實(shí)現(xiàn)進(jìn)程間通信。
下面是一個(gè)使用共享內(nèi)存和信號(hào)量手動(dòng)實(shí)現(xiàn)進(jìn)程間通信的示例:
import multiprocessing
import mmap
import os
import signal
import time
def sender(data, sem):
time.sleep(1)
sem.acquire()
data.seek(0)
data.write(b'Hello, receiver!')
sem.release()
def receiver(data, sem):
sem.acquire()
data.seek(0)
print(data.read())
sem.release()
if __name__ == '__main__':
with multiprocessing.shared_memory() as mem:
with mmap.mmap(mem.fd, mem.size) as data:
data.write(b'' * mem.size)
sem = multiprocessing.Semaphore(1)
p1 = multiprocessing.Process(target=sender, args=(data, sem))
p2 = multiprocessing.Process(target=receiver, args=(data, sem))
p1.start()
p2.start()
p1.join()
p2.join()
在上述代碼中,創(chuàng)建了兩個(gè)進(jìn)程 p1 和 p2,p1 向共享內(nèi)存中寫入消息,p2 從共享內(nèi)存中讀取并打印消息。使用信號(hào)量來(lái)保證共享內(nèi)存的互斥訪問。
總結(jié)
本文介紹了 Python 中常用的多線程和進(jìn)程通信機(jī)制。這些機(jī)制可以滿足不同線程間的數(shù)據(jù)傳輸需要,應(yīng)根據(jù)具體場(chǎng)景選擇合適的通信機(jī)制。