前言
大家好,我是jack xu,本篇是并發編程的第二篇,今天跟大家聊一聊線程池的那點事。本篇文章有點長,小伙們靜下心、耐下心來把他看完。。
為什么要使用線程池
1)降低創建線程和銷毀線程的性能開銷
2)提高響應速度,當有新任務需要執行是不需要等待線程創建就可以立馬執行
3)合理的設置線程池大小可以避免因為線程數超過硬件資源瓶頸帶來的問題
我們來看阿里巴巴的代碼規范,在項目中創建線程必須要使用線程池創建,原因就是我說的以上三點
線程池的使用
首先我們來看下UML類圖
- Executor:可以看到最頂層是 Executor 的接口。這個接口很簡單,只有一個 execute 方法。此接口的目的是為了把任務提交和任務執行解耦。
- ExecutorService:這還是一個接口,繼承自 Executor,它擴展了 Executor 接口,定義了更多線程池相關的操作。
- AbstractExecutorService:提供了 ExecutorService 的部分默認實現。
- ThreadPoolExecutor:實際上我們使用的線程池的實現是 ThreadPoolExecutor。它實現了線程池工作的完整機制。也是我們接下來分析的重點對象。
- ForkJoinPool:和ThreadPoolExecutor都繼承自AbstractExecutorService,適合用于分而治之,遞歸計算的算法
- ScheduledExecutorService:這個接口擴展了ExecutorService,定義個延遲執行和周期性執行任務的方法。
- ScheduledThreadPoolExecutor:此接口則是在繼承 ThreadPoolExecutor 的基礎上實現 ScheduledExecutorService 接口,提供定時和周期執行任務的特性。
搞清楚上面的結構很重要,Executors是一個工具類,然后看創建線程的兩種方式,第一種是通過Executors提供的工廠方法來實現,有下面四種方式
Executor executor1 = Executors.newFixedThreadPool(10);
Executor executor2 = Executors.newSingleThreadExecutor();
Executor executor3 = Executors.newCachedThreadPool();
Executor executor4 = Executors.newScheduledThreadPool(10);
復制代碼
第二種是通過構造方法來實現
ExecutorService executor5 = new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
復制代碼
其實查看第一種方式創建的源碼就會發現:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
復制代碼
根本上還是通過調用ThreadPoolExecutor的構造方法,創建時傳入不同參數,所以本質上還是只有一種創建線程池的方式,就是用構造方法,這里我不想講用Executors的工廠方法具體幫我們創建了怎樣的線程池,讓我們再來看一條阿里巴巴規范。
看到這里大家都明白了吧,正是因為封裝性太強了,反而小伙們會不知道怎么用,亂用,濫用,有可能會導致OOM,除非你對創建的這四個線程池了如指掌,所以我介紹了也是白介紹,因為就不在用,接下來我們重點看下ThreadPoolExecutor構造方法里各個參數的含義,構造方法有很多個,我選了一個最完整的。
public ThreadPoolExecutor(int corePoolSize, //核心線程數量
int maximumPoolSize, //最大線程數
long keepAliveTime, //超時時間,超出核心線程數量以外的線程空余存活時間
TimeUnit unit, //存活時間單位
BlockingQueue<Runnable> workQueue, //保存執行任務的隊列
ThreadFactory threadFactory,//創建新線程使用的工廠
RejectedExecutionHandler handler //當任務無法執行的時候的處理方式)
復制代碼
- corePoolSize:即線程池的核心線程數量,其實也是最小線程數量。不設置allowCoreThreadTimeOut 的情況下,核心線程數量范圍內的線程一直存活。線程不會自行銷毀,而是以掛起的狀態返回到線程池,直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。
- maximumPoolSize:即線程池的最大線程數量
- keepAliveTime和unit:超出核心線程數后的存活時間和單位
- workQueue:是一個阻塞的 queue,用來保存線程池要執行的所有任務。通??梢匀∠旅嫒N類型:
1)ArrayBlockingQueue:基于數組的先進先出隊列,此隊列創建時必須指定大??;
2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
3)SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
復制代碼
- ThreadFactory:我們一般用Executors.defaultThreadFactory()默認工廠,為什么要用工廠呢,其實就是規范了生成的Thread。避免調用new Thread創建,導致創建出來的Thread可能存在差異
- handler:當隊列和最大線程池都滿了之后的飽和策略。
1、AbortPolicy:直接拋出異常,默認策略;
2、CallerRunsPolicy:用調用者所在的線程來執行任務;
3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務;
4、DiscardPolicy:直接丟棄任務;
當然也可以根據應用場景實現 RejectedExecutionHandler 接口,自定義飽和策略,如記錄
日志或持久化存儲不能處理的任務
復制代碼
創建完線程池后使用也很簡單,帶返回值和不帶返回值,傳入對應傳入Runnable或者Callable接口的實現
//無返回值
executor5.execute(() -> System.out.println("jack xushuaige"));
//帶返回值
String message = executor5.submit(() -> { return "jack xushuaige"; }).get();
復制代碼
源碼分析
execute方法
基于源碼入口進行分析,先看execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
復制代碼
源代碼中有一段關鍵的注釋我沒有貼進來,下面我先把這段關鍵的注釋翻譯講解下:
分三步做處理:
1、如果運行的線程數量小于 corePoolSize,那么嘗試創建新的線程,并把傳入的 command 作為它的第一個 task 來執行。調用 addWorker 會自動檢查 runState 和 workCount,以此來防止在不應該添加線程時添加線程的錯誤警告;
2、即使 task 可以被成功加入隊列,我們仍舊需要再次確認我們是否應該添加 thread(因為最后一次檢查之后可能有線程已經死掉了)還是線程池在進入此方法后已經停掉了。所以我們會再次檢查狀態,如果有必要的話,可以回滾隊列?;蛘弋敍]有線程時,開啟新的 thread;
3、如果無法將 task 加入 queue,那么可以嘗試添加新的 thread。如果添加失敗,這是因為線程池被關閉或者已經飽和了,所以拒絕這個 task。
如果你看完以后還是一臉懵逼,那沒事,我把這個流程圖畫下來,你品,你細品,好好理解一下
然后介紹一下源碼中ctl是干什么的,點進去查看源碼
我們發現它是一個原子類,主要作用是用來保存線程數量和線程池的狀態,他用到了位運算, 一個int數值是32個 bit 位,這里采用高 3 位來保存運行狀態,低 29 位來保存線程數量。
我們來計算一下ctlOf(RUNNING, 0)方法,其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位,-1 的二進制是32個1(1111 1111 1111 1111 1111 1111 1111 1111),左移29位后得到(1110 0000 0000 0000 0000 0000 0000 0000),然后111| 0還是111,同理可得其他狀態的 bit 位。這個位運算很有意思,hashmap源碼中也用到了位運算,小伙們在平時開發中也可以嘗試用下,這樣運算速度會快,而且能夠裝b,介紹下這五種線程池的狀態
- RUNNING:接收新任務,并執行隊列中的任務
- SHUTDOWN:不接收新任務,但是執行隊列中的任務
- STOP:不接收新任務,不執行隊列中的任務,中斷正在執行中的任務
- TIDYING:所有的任務都已結束, 線程數量為 0,處于該狀態的線程池即將調用 terminated()方法
- TERMINATED:terminated()方法執行完成
他們的轉換關系如下:
addWorker方法
我們看到execute流程的核心方法為addWorker,我們繼續分析,源碼看起來比較唬人,其實就做了兩件事,拆分一下
第一步:更新worker的數量,代碼如下:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
復制代碼
retry是一個標記,和循環配合使用,continue retry 的時候,會跳到 retry 的地方再次執行。如果 break retry,則跳出整個循環體。源碼先獲取到 ctl,然后檢查狀態,然后根據創建線程類型的不同,進行數量的校驗。在通過CAS方式更新 ctl狀態,成功的話則跳出循環。否則再次取得線程池狀態,如果和最初已經不一致,那么從頭開始執行。如果狀態并未改變則繼續更新worker的數量。流程圖如下:
第二步:添加 worker 到 workers 的 set 中。并且啟動 worker 中持有的線程。代碼如下:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
復制代碼
可以看到添加 work 時需要先獲得鎖,這樣確保多線程并發安全。如果添加 worker 成功,那么調用 worker 中線程的 start 方法啟動線程。如果啟動失敗則調用 addWorkerFailed 方法進行回滾??吹竭@里小伙們會發現
1、ThreadPoolExecutor在初始化后并沒有啟動和創建任何線程,在調用 execute方法時才會調用 addWorker創建線程
2、addWorker方法中會創建新的worker,并啟動其持有的線程來執行任務。
上文提到如果線程數量已經達到corePoolSize,則只會把command 加入到 workQueue中,那么加入到 workQueue中的command是如何被執行的呢?下面我們來分析 Worker 的源代碼。
Worker類
Worker封裝了線程,是executor中的工作單元。worker繼承自AbstractQueuedSynchronizer,并實現 Runnable。 Worker 簡單理解其實就是一個線程,里面重新了 run 方法,我們來看他的構造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
復制代碼
再來看下這兩個重要的屬性
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
復制代碼
firstTask 用它來保存傳入的任務;thread 是在調用構造方法時通過 ThreadFactory 來創建的線程,是用來處理任務的線程,這里用的是 ThreadFactory 創建線程,并沒有直接 new,原因上文也提到過,這里看下 newThread 傳入的是 this,因為 Worker 本身繼承了 Runnable 接口,所以 addWork 中調用的 t.start(),實際上運行的是 t 所屬 worker 的 run 方法。worker 的 run 方法如下:
public void run() {
runWorker(this);
}
復制代碼
runWorker源碼再如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
復制代碼
簡單分析一下
1、先取出 worker 中的 firstTask,并清空;
2、如果沒有 firstTask,則調用 getTask 方法,從 workQueue 中獲取task;
3、獲取鎖;
4、執行 beforeExecute。這里是空方法,如有需要在子類實現;
5、執行 task.run;
6、執行 afterExecute。這里是空方法,如有需要在子類實現;
7、清空 task,completedTasks++,釋放鎖;
8、當有異常或者沒有 task 可執行時,會進入外層 finnaly 代碼塊。調用 processWorkerExit 退出當前 worker。從 works 中移除本 worker 后,如果 worker 數量小于 corePoolSize,則創建新的 worker,以維持 corePoolSize 大小的線程數。
這行代碼 while (task != null || (task = getTask()) != null) ,確保了 worker 不停地從 workQueue 中取得 task 執行。getTask 方法會從 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出來。
至此,關于 executor 如何創建并啟動線程執行 task 的過程已經分析的清清楚楚,明明白白,后面還有shutdown()、shutdownNow()等其他方法留給小伙們自行去觀察研究哈。
如何合理配置線程池的大小
線程池大小不是靠猜,也不是說越多越好。
- CPU 密集型任務:主要是執行計算任務,響應時間很快,CPU 一直在運行,這種任務 CPU 的利用率很高,那么線程數的配置應該根據 CPU 核心數來決定,應當分配較少的線程,比如和 CPU 個數相當的大小。
- IO 密集型任務:主要是進行 IO 操作,執行 IO 操作的時間較長,由于線程并不是一直在運行,這時 CPU 處于空閑狀態, 這種情況下可以增加線程池的大小,比如 CPU 個數 * 2
當然這些都是經驗值,最好的方式還是根據實際情況測試得出最佳配置。
線程池的監控
如果在項目中大規模的使用了線程池,那么必須要有一套監控體系,來指導當前線程池的狀 態,當出現問題的時候可以快速定位到問題。我們通過重寫線程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以實現對線程的監控
看這些名稱和定義都知道,這是讓子類來實現的,可以在線程執行前、后、終止狀態執行自定義邏輯。
總結
線程池這東西說簡單也簡單,說難也難,簡單是因為用起來簡單,所以小伙們可能覺得這有啥好講的,難是難在要知道他的底層的源碼,他是如何調度線程的,說兩點吧,第一是本文中用了大量的流程圖,當我們在閱讀源碼或者做復雜業務開發的時候,一定要靜下心來先畫個圖,否則會被繞暈或者被別人打斷后,又得從頭到尾的看一邊,第二是閱讀源碼,剛畢業的小伙伴可能只要會用行了,但是如果你工作五年了,還是只會用,而不知道他里面是如何實現的,那你比剛畢業的優勢在哪里,憑什么工資比剛畢業的高。如果你覺得寫的不錯,請點一個贊!