JAVA多線程的實現方式
Java程序中,常見有4種方式實現多線程
①繼承Thread類
②實現Runnable接口
③實現Callable接口
④使用Executor框架
在JDK5之前,創建線程有2種方式,一種是繼承Thread類,另外一種是實現Runnable接口。這2種方式在執行完任務之后都無法獲取執行結果,如果需要獲取執行結果,就必須通過共享變量或者使用線程通信的方式來達到效果,這樣使用起來就比較麻煩。自Java 5起,就提供了Callable和Future,通過它們可以在任務執行完畢之后得到任務執行結果。
方式①舉例:繼承Thread類,實現run()方法,調用start()方法啟動線程
public class ThreadSample extends Thread {
@Override
public void run() {
System.out.println(this.getName() + " do some work...");
}
public static void main(String[] args) {
ThreadSample threadSample = new ThreadSample();
threadSample.setName("thread-a");
threadSample.start();
}
}
start()方法調用后并不是立即執行多線程代碼,而是使得該線程變為Ready狀態,等待CPU分配執行時間。
方式②舉例:實現Runnable接口,實現run()方法,將實例對象傳入Thread構造方法
public class ThreadSample implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " do some work...");
}
public static void main(String[] args) {
Thread threadSample = new Thread(new ThreadSample(), "thread-b");
threadSample.start();
}
}
方式③舉例:實現Callable接口和FutureTask對象組合
public class ThreadSample implements Callable<Integer> {
@Override
public Integer call() {
int result = 0;
for (int i = 0; i <= 10; i++) {
result++;
}
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、實例化Callable對象
ThreadSample callableSample = new ThreadSample();
//2、創建裝載線程的FutureTask對象
FutureTask<Integer> ft = new FutureTask<Integer>(callableSample);
//3、啟動線程
Thread thread = new Thread(ft, "thread-callable");
thread.start();
//4、獲取返回結果
Integer result = ft.get();
System.out.println("result = " + result);
}
}
與使用Runnable相比,Callable功能更強大
- 可以有返回值,支持泛型的返回值,借助FutureTask類獲取返回值;
- 可以捕獲程序執行過程中的異常。
方式④舉例:線程池實現
public class ThreadSample implements Callable<Integer> {
@Override
public Integer call() {
int result = 0;
for (int i = 0; i <= 10; i++) {
result++;
}
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService services = Executors.newSingleThreadExecutor();
Future<Integer> future = services.submit(new ThreadSample());
System.out.println("result = " + future.get());
services.shutdown();
}
}
以上繼承Thread類、實現Runnable接口、實現Callable接口三種方式中,理論上優先選用Runnable接口和Callable接口,如果有需要返回值則選用Callable接口實現方式。此外,無論何時,當看到這種形式的代碼:
new Thread(runnable).start()
并且最終希望有一個更加靈活的執行策略時,都可以認真考慮使用Executor代替Thread。
使用線程池的好處
在線程池中執行任務線程,比起每個任務創建一個線程,有很多優勢。
- 減少系統開銷。重用存在的線程,而不是創建新的線程,這可以在處理多請求時抵消線程創建、銷毀產生的開銷。
- 提升請求響應性。在請求到達時,工作者線程已經存在,可以立即執行,因此提高了響應性。
- 增強線程的可管理性。通過調整線程池的大小,可以充分利用CPU資源,同時可以防止過多的線程相互競爭資源,導致應用程序耗盡內存或者失敗。線程池可以對線程資源進行統一分配、調優和監控。
線程池的工作流程
Java線程池的核心實現類是ThreadPoolExecutor類,任務提交到線程池時,具體處理由ThreadPoolExecutor類的execute()方法執行。當一個新任務提交到線程池時,線程池的處理流程如下:
①判斷核心線程池里的線程是否都在執行任務,如果不是,創建一個新的工作線程來執行任務。如果是,則進行下一步流程。
②判斷阻塞隊列是否已滿,如果沒滿,則將新提交的任務存儲在阻塞隊列中。如果滿,則進行下一步流程。
③判斷線程池中的線程是否都處于工作狀態,如果沒有,則創建一個新的工作線程來執行任務。如果線程池中所有的線程都處于工作狀態,則交給飽和策略來處理這個任務。
ThreadPoolExecutor通用的構造函數為:
參數說明如下
corePoolSize
- 線程池基本大小。提交一個任務到線程池時,線程池會創建一個新的線程來執行任務。新任務提交時,當目前線程數小于corePoolSize,即使有空閑的線程可以執行該任務,也會創建新的線程。
- 如果線程池中的線程數已經大于或等于corePoolSize,則不會創建新的線程。
- 當一個ThreadPoolExecutor被初始創建后,所有核心線程并非立即開始,而是等到有任務提交的時刻。但如果調用了prestartAllCoreThreads()方法,所有核心線程會立即啟動。
maximuPoolSize
- 線程池允許創建的最大線程數。當阻塞隊列滿,且線程數小于maximumPoolSize時,便可以創建新的線程執行任務。
- 如果使用無界阻塞隊列,該參數無效。
keepAliveTime
- 線程池的工作線程空閑后,保持存活的時間。如果任務多且任務執行時間較短,可以調大該值,提高線程利用率。
- 如果一個線程已經閑置的時間超過了存活時間,它將成為一個被回收的候選者,如果當前的池的大小超過了的corePoolSize,線程池會終止它。
unit
- 與keepAliveTime相關聯的時間單位。可選值有DAYS、HOURS、MINUTES、毫秒、微妙、納秒。
workQueue
- 用于保存等待執行的任務的阻塞隊列。ThreadPoolExecutor允許提供一個BlockingQueue來持有等待執行的任務。任務排隊有3種基本方法:無限隊列、有限隊列、同步移交。隊列的選擇和很多其他的配置參數都有關系,比如池的大小等。ThreadPoolExecutor推薦以下幾種阻塞隊列。
- LinkedBlockingQueue:線程安全的阻塞隊列,先進先出(FIFO)。可以指定容量(有限隊列),也可以不指定(無限隊列),不指定的話默認最大是Integer.MAX_VALUE。如果所有的工作者線程都處于忙碌狀態,新提交的任務將會在隊列中等候。如果新的任務持續快速到達,超過了它們被執行的速度,隊列會無限制地增加。線程池中能創建的最大線程數為corePoolSize指定的值。
- ArrayBlockingQueue:數組實現的有界阻塞隊列,先進先出(FIFO)。線程池中能創建的最大線程數為maximumPoolSize指定的值。有界隊列有助于避免資源耗盡,當隊列滿時,如果還有新的任務到達,將根據飽和策略(也稱拒絕策略)進行處理。對于一個有界隊列,隊列的長度與池的長度必須一起調節。一個大隊列和一個小池,可以控制對內存和CPU的使用,也可以減少上下文切換,但相應的吞吐量也會減小。
- SynchronousQueue:SynchronousQueue并不是一個真正的隊列,而是一種管理直接在線程間移交信息的機制。當新任務到達時,如果所有工作線程都處于忙碌狀態,且線程池數量小于maximumPoolSize,就會創建一個新的線程。否則根據飽和策略處理。只有池是無限的,或者可以接受任務被拒絕,SynchronousQueue才是一個有實際價值的選擇。
- PriorityBlokingQueue: 一個支持優先級的無界阻塞隊列 。使用該隊列,線程池中能創建的最大線程數為corePoolSize。
threadFactory
- 線程池創建線程時使用的線程工廠,可以不指定該參數,使用默認的線程工廠Executors.defaultThreadFactory()。
handler
飽和策略,也稱拒絕策略。當有限隊列滿且線程池滿的情況下,新的任務到達后,飽和策略將進行處理。有以下幾種:
- ThreadPoolExecutor.AbortPolicy()
拋出RejectedExecutionException異常。默認策略。調用者可以捕獲拋出的異常,進行相應的處理。
- ThreadPoolExecutor.CallerRunsPolicy()
不會丟棄任務,也不會拋出異常,由向線程池提交任務的線程來執行該任務。
- ThreadPoolExecutor.DiscardPolicy()
丟棄當前的任務
- ThreadPoolExecutor.DiscardOldestPolicy()
丟棄最舊的任務(最先提交而沒有得到執行的任務),并執行當前任務。
ThreadPoolExecutor執行流程如下
①當新任務提交時,如果當前線程池中的線程數小于corePoolSize,則創建新的線程處理。
②如果線程池中的線程大于或等于corePoolSize,且BlockingQueue未滿,則將新任務加入BlockingQueue。
③如果BlockingQueue已滿,且線程池中的線程數小于maximumPoolSize,則創建新的工作線程來執行任務。
④如果當前運行的線程大于或等于maximumPoolSize,將執行飽和策略。即調用
RejectedExecutionHandler.rejectExecution()方法。
幾種常見的線程池
Executors提供了一些靜態工廠方法創建的常見線程池。
- newFixedThreadPool
創建一個固定長度的線程池,每當提交一個任務就創建一個線程,直到達到線程池最大長度,這時線程池長度不再變化。如果一個線程異常退出,線程池會補充一個新的線程。
- newCachedThreadPool
創建一個可緩存的線程池,不會對池的長度做限制。如果線程池長度超過需求,它可以靈活地回收空閑的線程;當需求增加時,它可以靈活地添加新的線程。
- newSingleThreadExecutor
創建一個單線程的executor,只創建唯一的工作者線程來執行任務,如果這個線程異常結束,會有一個新的取代它。
- newScheduledThreadPool
創建一個定長的線程池,支持定時執行任務。
這幾種線程池中,newFixedThreadPool和newSingleThreadExecutor默認使用無線隊列LinkedBlockingQueue。newCachedThreadPool使用了同步移交隊列SynchronousQueue。newScheduledThreadPool使用了DelayedWorkQueue阻塞隊列。
newCachedThreadPool的corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,其他幾種線程池corePoolSize與maximumPoolSize一樣大。
線程池的狀態與生命周期
線程池有5種狀態,在ThreadPoolExecutor 源碼中有定義。
- RUNNING : 線程池最初創建后的初始狀態,該狀態的線程池既能接受新提交的任務 ,又能處理阻塞隊列中任務。
- SHUTDOWN: 調用shutdown()方法后進入該狀態。該狀態的線程池不能接收新提交的任務 ,但是能處理阻塞隊列中的任務。
- STOP: 調用shutdownNow()方法后進入該狀態。該狀態的線程池不接受新提交的任務 ,也不處理在阻塞隊列中的任務 ,還會中斷正在執行的任務。
- TIDYING: 當所有的任務都已終止,工作線程數為0的狀態。線程池進入該狀態后會調用 terminated() 鉤子方法進入TERMINATED 狀態。
- TERMINATED: 在terminated()鉤子方法執行完后進入該狀態。
調用線程池的shutdown()或者shutdownNow()方法可以關閉線程池,遍歷線程池中工作線程,逐個調用interrupt方法來中斷線程。
Shutdown()方法與shutdownNow()的特點:
Shutdown()方法將線程池的狀態設置為SHUTDOWN狀態,只會中斷空閑的工作線程。
shutdownNow()方法將線程池的狀態設置為STOP狀態,會中斷所有工作線程,不管工作線程是否空閑。
調用兩者中任何一種方法,都會使isShutdown()方法的返回值為true;線程池中所有的任務都關閉后,isTerminated()方法的返回值為true。
通常使用shutdown()方法關閉線程池,如果不要求任務一定要執行完,則可以調用shutdownNow()方法。
確定線程池的大小
線程池合理的長度取決于所要執行的任務特征以及程序所部署的系統環境,一般根據這二者因素使用配置文件提供或者通過CPU核數N:
N = Runtime.getRuntime().availableProcessors();
動態計算而不是硬編碼在代碼中。主要是避免線程池過大或過小這兩種極端情況。如果線程池過大,會導致CPU和內存資源競爭,頻繁的上下文切換,任務延遲,甚至資源耗盡。如果線程池過小,會造成CPU和內存資源未充分利用,任務處理的吞吐量減小。
對于任務特征來說,需要分清楚是計算密集型任務,還是IO密集型任務或是混合型任務。
計算密集型也稱為CPU密集型,意思就是該任務需要大量運算,而沒有阻塞,CPU一直全速運行。CPU密集型任務只有在多核CPU上才可能得到加速,即scale up,通過多線程程序享受到增加CPU核數帶來的好處。
IO密集型,即該任務需要大量的IO操作,例如網絡連接、數據庫連接等等,執行任務過程中會有大量的阻塞。在單線程上運行IO密集型任務會導致浪費大量的CPU運算能力浪費在等待。
對于計算密集型任務,原則是配置盡可能少的線程數,通常建議以下計算方式設置線程池大小來獲得最優利用率:
N(線程數) = N(CPU核數)+ 1
對于IO密集型任務,考慮的因素會多一些,原則是因為較多的時間處于IO阻塞,不能處理新的任務,所有線程數盡可能大一些,通常建議是:
N(線程數) = 2 x N(CPU核數) + 1
或者更精確的:
N(線程數) = N(CPU核數) x U x (1 + W/C)
其中U表示CPU使用率,W/C表示IO等待時間與計算時間的比率,這個不需要太精確,只需要一個估算值。例如,4核CPU,CPU使用率80%,IO等待時間1秒,計算時間0.1秒,那么線程數為:4.8 x 11≈53。一些文章中還提到這種計算方式:
N(線程數) = N(CPU核數) x U / (1 - f)
其中U表示CPU使用率,f表示阻塞系數,即IO等待時間與任務執行總時間的比率:W/(W + C)。根據上面的例子計算出線程數為:4.8/0.09≈53。兩種計算方式的結果是很相近的。
以上的計算方式和建議盡可以作為理論參考值,實際業務中可能并不完全按照這個計算值來設置。可以根據對線程池各項參數的監控,來確定一個合理的值。ThreadPoolExecutor提供的一些可用于獲取監控的參數方法如下:
- getTaskCount():線程池需要執行的任務數量,包括已經執行完的、未執行的和正在執行的。
- getCompletedTaskCount():線程池在運行過程中已完成的任務數量 ,completedTaskCount <= taskCount。
- getLargestPoolSize():線程池曾經創建過的最大線程數量 ,通過這個數據可以知道線程池是否滿過。如等于線程池的最大大小 ,則表示線程池曾經滿了。
- getPoolSize(): 線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以線程池的線程數量只增不減 。
- getActiveCount():獲取活動的線程數。
此外,可以通過繼承ThreadPoolExecutor并重寫它的 beforeExecute(),afterExecute() 和 terminated()方法,我們可以在任務執行前,執行后和線程池關閉前做一些統計、日志輸出等等操作,以幫助我們更好地監控到線程池的運行狀態。