1 問題描述
假設我們正在編寫一個簡單的應用程序,該應用程序從客戶端接收一些輸入,對其進行一些CPU密集型處理,然后記錄輸出。我們編寫的代碼看起來像以下內容:
class ProcessingLibrary {
public void process (Input userInput) {
// 一些CPU密集型邏輯,用于處理用戶輸入
userInput.process();
// 記錄結果
Logger.log(userInput.getResults());
}
}
看起來很簡單,一個普通的函數,它接受用戶輸入,對其進行一些處理,然后返回輸出,我們可以將這個庫提供給客戶端。但是,你的客戶如果正在高并發地調用該進程函數,很快他們可能會抱怨他們的請求處理輸入的時間太長。原因非常簡單,當外部客戶端調用你的函數時,調用線程被阻塞,因為實際處理是由調用線程進行的。
2 初步解決方案
添加另一個要求:
-
調用線程不應該被阻塞。
為了解決這個問題,我們可以考慮利用客戶端的CPU核心并將多線程納入我們的代碼。我們進行一些修改,具體如下:
class ProcessingLibrary {
public void process (Input userInput) {
Runnable runnable = () -> {
userInput.process();
Logger.log(userInput.getResults());
};
Thread thread = new Thread(runnable);
thread.start();
}
}
對于每個請求,最好不要阻塞調用線程,而是生成一個新線程來進行重度處理。調用線程可以從我們的處理方法中提前返回。確實,這比我們早期的版本有很大的改進。我們的客戶開始使用新版本,他們比以前更滿意了。但現在他們開始抱怨CPU使用率變得過高,并且在他們那邊發生了崩潰。發生了什么?如果我們仔細檢查代碼,我們會發現我們沒有限制正在生成的線程數!如果客戶以非常高的速率調用進程函數,可能會生成數百甚至數千個線程,這對CPU來說是很大的開銷。我們還必須限制生成的線程數。
3 線程池
注意!我們這里又添加了一個要求:
-
調用線程不應該被阻塞。
-
我們的邏輯不應生成無限多數量的線程。
我們想要的是,調用線程不應該被阻塞,我們應該生成一定數量的線程,由客戶根據他們的CPU資源和他們想要實現的并行度決定。實際數量取決于請求到達的速率和平均請求時間。線程池是解決這個問題的理想解決方案。線程池是一個簡單的概念,可以并行執行應用程序代碼并利用CPU核心。線程池包含一組固定數量的可重復使用的工作線程,它們執行分配給它們的任務,而不會阻塞調用線程。我們下面看看如何實現一個簡單的線程池。
我們可以得出幾個簡單的觀察:
-
我們的線程應該是可重復使用的,并且應該在請求到達時惰性創建。線程創建是一個昂貴的過程(至少在JAVA中是這樣)。
-
如果請求到達的速率遠高于線程池中的線程數,我們可以在其他請求執行完畢時將請求輸入保持在等待狀態,然后當一個線程完成處理一個請求時,它可以從請求行中獲取另一個請求并開始處理它。通過這種方式,我們仍然可以實現相當高的并行性并獲得更多的請求吞吐量。
-
隊列可以成為存儲我們傳入請求的良好數據結構,而我們的線程可以在完成先前的項目后不斷地從隊列中獲取項目。
考慮以上要求,我們為線程池編寫一個簡單的類。
class ThreadPool {
private BlockingQueue<Runnable> taskQueue;
private Integer poolSize;
private AtomicInteger currentPoolSize;
public ThreadPool(int poolSize) {
this.poolSize = poolSize;
this.taskQueue = new LinkedBlockingQueue<>();
this.currentPoolSize = new AtomicInteger(0);
}
public void submitTask(Runnable runnable) {
this.taskQueue.add(runnable);
if(this.currentPoolSize.get() < this.poolSize) {
// 如果有更多的池大小可用,創建一個新線程
// 這個線程也應該被重新用于未來任務
// 因此,它應該繼續從隊列中尋找更多的任務
this.currentPoolSize.incrementAndGet();
this.createSingleThreadForPool();
}
}
private void createSingleThreadForPool() {
Runnable poolRunner = () -> {
while(true) {
if(this.taskQueue.size() > 0) {
Runnable taskFromQueue = this.taskQueue.poll();
taskFromQueue.run();
}
}
};
new Thread(poolRunner).start();
}
}
從以上實現中可以得到以下幾點:
-
BlockingQueue
是一個線程安全的隊列實現。我們需要確保線程安全,因為多個線程正在訪問共享狀態。AtomicInteger
也是如此,用于線程安全更新我們當前的池大小。 -
池運行者中的
while
循環是為了確保該線程保持活動狀態,以便我們在想要接收更多任務時可以繼續運行。
我們可以更改我們對ProcessingLibrary
的實現,如下:
class ProcessingLibrary {
private ThreadPool threadPool;
public ProcessingLibrary(int poolSize) {
this.threadPool = new ThreadPool(poolSize);
}
public void process (Input userInput) {
Runnable runnable = () -> {
userInput.process();
Logger.log(userInput.getResults());
};
this.threadPool.submitTask(runnable);
}
}
現在,我們已經滿足了對這個問題的兩個要求 :)
在Java中,concurrent庫提供了與此類似的內容,稱為ExecutorService
。雖然我們討論的實現有一些注意事項,例如我們生成的線程一直在等待,但這是一個理解線程池內部工作原理的良好起點。