背景
Tomcat 源碼中多處用了JAVA.util.concurrent 包中的類,用以處理多線程環境下的流程控制。近日分析了下NioEndpoint 源碼,本文將以此類為背景,膜拜下 Java 大神們使用 CountDownLatch 并發控制的手法,其實也就是簡單的實際應用,算不上高深。
類圖框架
NIO tailored thread pool, providing the following services:
Socket acceptor thread:Acceptor
Socket poller thread:Poller
Worker threads pool:Executor
以上是該類的注釋,結合源碼我們知道 NioEndpoint 就是一個定制線程池,管理了三種線程:Acceptor、Poller、Worker。
(百來的一張很清晰的結構圖如下:)
初始化
NioEndpoint 類維護了一個 stopLatch 的變量,其類型就是 CountDownLatch。它根據 Poller 線程的個數進行初始化的,源碼如下:
public void bind() throws Exception {
....
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
stopLatch = new CountDownLatch(pollerThreadCount);
....
}
NioEndpont 類初始化時指定了 Poller 和 Accetpor 線程數,而且從上面代碼的注釋信息來看 acceptorThreadCount 的固定 是 1,即 Tomcat 的 NIO 并不支持多個 Accepor 線程,此外也沒有可以修改該屬性的途徑。
stopLatch 控制流程
stopLatch,顧名思義,是控制 Tomcat 的組件停止時使用的鎖,利用 CountDownLatch ,主線程等待一組線程到達某個狀態后,才進行后面的處理。NioEndpoint 的 stopInternal() 方法的流程如下:
public void stopInternal() {
releaseConnectionLatch();
if (!paused) {
pause();
}
if (running) {
running = false;
unlockAccept();
for (int i=0; pollers!=null && i<pollers.length; i++) {
if (pollers[i]==null) continue;
pollers[i].destroy();
pollers[i] = null;
}
try {
stopLatch.await(selectorTimeout + 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
shutdownExecutor();
eventCache.clear();
nioChannels.clear();
processorCache.clear();
}
}
該方法將導致所有的處理線程都停止工作,其流程為:
- 首先,通知Poller線程停止工作,調用其 destroy,設置 Poller 的 close 標識為 true。
- 其次,設置 running 為 false,通知 Acceptor 線程終止 run 方法的循環處理。
- 第三,當前線程 stopLatch.await ,等待所有的 Poller 線程的 run方法結束。Poller 的 run 方法最后一句是 stopLatch.countDown(),當 stopInternal 的 await 方法被喚醒時,說明所有的Poller 線程都結束了。
- 第四,此處調用 await 操作的超時時間設置為 selectorTimeout,這個值也是Poller處理時的阻塞時間,也就是說:如果Poller的在輪詢過程中調用了selector.select(selectorTimeout);的話,最多等待這么長時間,就能保證所有的Poller都及時結束了。
此處,之所以不用考慮 Acceptor 的結束問題,是因為 Acceptor 線程只有一個,而且它沒有阻塞處理,所以一旦 running 標識為 false,它就會立即結束。
所有的處理線程都結束之后,shutdownExecutor() 操作會關閉工作線程池的調度器,至此,所有的線程都被關閉了。
啟示錄
開發中,如何需要自定義線程池框架,就可以參照這個流程對線程池資源進行關閉,用 JUC 包中的并發工具類,比自己寫同步計數器方便多了!Tomcat 教我們的這一招,你學會了嗎?