作者 | 阿文,責編 | 郭芮
頭圖 | CSDN 下載自東方IC
在多線程和高并發場景中,需要創建大量的線程來進行業務處理,我們通常創建線程有兩種方法,一種是通過繼承Thread類,另一種是實現Runnable的接口,但是我們創建這兩種線程在運行結束后都會被虛擬機銷毀,如果數量多的話,頻繁的創建和銷毀線程會大大浪費時間和效率,更重要的是浪費內存,線程執行完畢后變為死亡狀態,線程對象變為垃圾,這個需要依靠虛擬機進行監督和回收,影響系統的性能。這種問題使用線程池便可以很好的解決。通過線程池線程,銷毀及回收等交由線程池進行管理,就可以避免以上的問題。
我們在使用過程中經常會直接使用newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool(int Threads)等已經封裝好的線程池,但這些都是通過ThreadPoolExecutor類中通過構造函數傳入不同的參數封裝的對象,所以想要了解線程池,我們就要認真研究一下線程池中最重要的ThreadPoolExecutor類。
ThreadPoolExecutor類最重要的構造函數:
publicThreadPoolExecutor( intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
函數的參數含義如下:
corePoolSize:核心池大小,指定了線程池中的線程數量。
maximumPoolSize :最大池大小,指定了線程池中的最大線程數量。
keepAliveTime :存活時間,當線程池數量超過corePoolSize時,多余的空閑線程的存活時間,即超過corePoolSize的空閑線程,在多長時間內會被銷毀。
unit :keepAliveTime的單位。
workQueue :任務隊列,被提交單尚未被執行的任務。
threadFactory :線程工廠,用于創建線程,一般用于默認的即可。
handler:拒絕策略。當任務太多來不及處理時,如何拒絕任務。
核心池大小,最大池大小和存活時間共同管理這線程的創建與銷毀。核心池大小是目標大小;線程池的實現試圖維護線程池的大小,即是沒有任務執行,池的大小也等于核心池的大小,并且在工作隊列充滿前,線程池都不會創建更多的線程。最大池的大小是可同時活動的線程數的上限。如果一個線程已經閑置的時間超過了存活時間,它將被線程池回收。
構造函數的參數中大部分都很簡單,只有參數workQueue和handler需要進行詳細說明,下面對這兩個參數進行詳細的說明:
參數workQueue指被提交但未執行的任務隊列,它是一個BlockingQueue接口的對象,僅用于存放Runnable對象,根據隊列功能分類,在ThreadPoolExecutor類的構造函數中可以使用以下幾種BlockingQueue接口。
1.直接提交的隊列:該功能由SynchronousQueue對象提供。SynchronousQueue是一個特殊的BlockingQueue。SynchronousQueue沒有容量,每一個插入操作都要等待一個相應的刪除操作,反之,每一個刪除操作都要等待對應的插入操作。如果使用SynchronousQueue,則提交的任務不會被真實的保存,而總是將新任務提交給線程執行,如果沒有空閑線程,則嘗試創建新的線程,如果進程數量已經達到最大值,則執行拒絕策略。因此,使用SynchronousQueue隊列,通常要設置很大的maximumPoolSize值,否則很容易執行拒絕策略。
2.有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue類實現。ArrayBlockingQueue類的構造函數必須帶一個容量參數,表示該隊列的最大容量:
當使用有界的任務隊列時,若有新的任務需要執行,如果線程池的實際線程數小于corePoolSize,則會優先創建新的線程,若大于corePoolSize,則會將新任務加入等待隊列。若等待隊列已滿,無法加入。則在總線程數不大于maximumPoolSize的前提下,創建新的進程執行任務。若大于maximumPoolSize,則執行拒絕策略。可見,有界隊列僅當在任務隊列裝滿時,才可能將線程數提升到corePoolSize以上,換言之,除非系統非常繁忙,否則要確保核心線程數維持在corePoolSize。
3.無界的任務隊列:無界任務隊列可以通過LinkedBlockingQueue類實現。與有界隊列相比,除非系統資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新的任務到來,系統的線程數小于corePoolSize時,線程池會生成新的線程執行任務,但當系統的線程數達到corePoolSize時,線程就不會繼續增加了。若后續任由新的任務加入,而又沒有空閑的線程資源,則任務直接進入隊列等待。若任務創建和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統內存。
4.優先任務隊列:優先任務隊列是帶有執行優先級的任務隊列。它通過PriorityBlockingQueue類實現,可以控制任務的執行先后順序。他是一個特殊的無界隊列。無論是有界隊列ArrayBlockingQueue類,還是未指定大小的無界隊列LinkedBlockingQueue類都是按照先進先出算法處理任務的。而PriorityBlockingQueue類則可以根據任務自身的優先級順序先后執行,在確保系統性能的同時,也能有很好的質量保證(總是確保高優先級的任務先執行)。
拒絕策略:
ThreadPoolExecutor類的最后一個參數指定了拒絕策略。也就是當任務數量超過系統實際承載能力時,就要用到拒絕策略了。拒絕策略可以說是系統超負荷運行時的補救措施,通常由于壓力太大而引起的,也就是線程池中的線程已經用完了,無法繼續為新任務服務,同時,等待隊列中也已經排滿了,再也放不下新任務了。這時,我們就需要有一套機制合理的處理這個問題。
jdk在ThreadPoolExecutor類中定義了四種內置的拒絕策略,其均實現RejectedExecutionHandler接口。其四種拒絕策略為:
1.AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
2.CallRunsPolicy策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交線程的性能極有可能會急劇下降。
3.DiscardOldestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,并嘗試再次提交當前任務。
4.DiscardPolicy策略:該策略默默地丟棄無法處理的任務,不予任何處理。如果允許任務丟失,我覺得這可能是最好的一種方案了吧。
線程池的主要作用是為了線程復用,也就是避免了線程的頻繁創建。但是,最開始的那些線程從何而來呢?答案就是ThreadFactory。ThreadFactory是一個接口,它只有一個用來創建線程的方法:
當線程池需要新建線程時,就會調用這個方法。
對于核心的幾個線程池,無論是newFixedThreadPool方法,newSingleThreadExecutor方法,還是newCacheThreadPool方法,雖然看起來創建的線程有著完全不同的功能特點,但其內部實現均使用了ThreadPoolExecutor類,下面給出這三個線程池的實現方式
publicstaticExecutorService newFixedThreadPool( intnThreads) {
returnnewThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
newLinkedBlockingQueue<Runnable>);
}
newFixedThreadPool 方法的實現,它返回了一個corePoolSize和maximumPoolSize大小一樣的,并且使用了LinkedBlockingQueue任務隊列的線程池。因為對固定大小的線程池而言,不存在線程數量的動態變化,因此corePoreSize和maximumPoolSize相等。同時,它使用無界隊列存放無法立即執行的任務,當任務提交非常頻繁的時候,該隊列可能迅速膨脹,從而耗盡系統資源。
publicstaticExecutorService newSingleThreadExecutor{
returnnewFinalizableDelegatedExecutorService
( newThreadPoolExecutor( 1, 1,
0L, TimeUnit.MILLISECONDS,
newLinkedBlockingQueue<Runnable>));
}
newSingleThreadExecutor方法返回的單線程線程池,是newFixedThreadPool方法的一種退化,只是簡單的將線程池線程數量設置為1。它的特點在于工作線程數目被限制為1,操作一個無界的工作隊列,所以他能保證了所有任務都是被順序執行,最多會有一個任務處于活動狀態,并且不允許使用者改動線程池實例,因此可以避免其改變線程數目。
publicstaticExecutorService newCachedThreadPool{
returnnewThreadPoolExecutor( 0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
newSynchronousQueue<Runnable>);
}
newCacheThreadPool方法返回corePoolSize為0,maximumPoolSize無窮大的線程池,這意味著在沒有任務時,該線程池內無線程,而當任務被提交時,該線程池會使用空閑的線程執行任務,若無空閑線程,則將任務加入SynchronousQueue隊列,而SynchronousQueue隊列時一種直接提交的隊列,它總會迫使線程池增加新的線程執行任務。當任務執行完畢后,由于corePoolSize為0,因此空閑線程又會在指定時間內60秒內被回收。它是一種用來處理大量短時間工作任務的線程池,具有幾個鮮明特點:它會試圖緩存線程并重用,當無緩存線程可用時,就會創建新的工作線程;如果線程閑置的時間超過60秒,則被終止并移除緩存,長時間閑置時,這種線程池,不會消耗什么資源,其內部使用SynchronousQueue作為工作隊列,無界線程池,可以進行自動線程回收。
在使用自定義線程池時,要根據應用的具體情況,選擇合適的并發隊列作為任務的緩沖。當線程資源緊張時,不同的并發隊列對系統行為和性能的影響也不相同。
importJAVA.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.ThreadFactory;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassThreadPoolDefinedTest{
publicstaticvoidmain(String[] args){
LinkedBlockingQueue<Runnable> blockingQueue = newLinkedBlockingQueue<>( 100);
ThreadFactory threadFactory = newThreadFactory {
// int i = 0; 用并發安全的包裝類
AtomicInteger atomicInteger = newAtomicInteger( 1);
@Override
publicThread newThread(Runnable r){
//創建線程任務傳進來
Thread thread = newThread(r);
// 給線程起個名字
thread.setName( "MyThread"+ atomicInteger.getAndIncrement);
returnthread;
}
};
ThreadPoolExecutor pool = newThreadPoolExecutor( 10, 10, 1, TimeUnit.SECONDS, blockingQueue, threadFactory);
for( inti = 0; i < 5; i++) {
pool.execute( newRunnable {
@Override
publicvoidrun{
try{
method;
} catch(InterruptedException e) {
e.printStackTrace;
}
}
});
}
}
privatestaticvoidmethodthrowsInterruptedException {
System.out.println( "ThreadName"+ Thread.currentThread.getName + "進來了");
Thread.sleep( 2000);
System.out.println( "ThreadName"+ Thread.currentThread.getName + "出去了");
}
}
通過探究ThreadPoolExecutor類中封裝的線程池的構造函數,可以有效的理解創建線程池時的各個參數的作用,從而選擇適合我們業務場景所需要的線程池類型。線程池涵蓋的內容很多很豐富,我們需要不斷通過學習和實踐,增強我們對線程,線程池的理解,希望通過本篇文章對你能有所幫助。