作為 JAVA 程序員,無論是技術(shù)面試、項(xiàng)目研發(fā)或者是學(xué)習(xí)框架源碼,不徹底掌握 Java 多線程的知識,做不到心中有數(shù),干啥都沒底氣,尤其是技術(shù)深究時(shí)往往略顯發(fā)憷。
坐穩(wěn)扶好,通過今天的分享,能讓你輕松 get 如下幾點(diǎn)。
1. Executor 框架家族簡介;
2. 源碼解讀:線程池狀態(tài)以及狀態(tài)流轉(zhuǎn);
3. 源碼解讀:部分成員變量及方法;
4. 源碼解讀:任務(wù)提交submit方法背后;
5. 源碼揭秘之后的反思;
6. 寄語。
Executor 家族簡介
一圖勝千言,腦中有圖心不慌。
executor 家族簡圖
(一)Executor 接口。
public interface Executor {
void execute(Runnable command);
}
Executor 是一個(gè)接口(主要用于定義規(guī)范),定義了 execute 方法,用于接收 Runnable 對象。
(二)ExecutorService 接口。
public interface ExecutorService extends Executor {
// ... ...
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// ... ...
}
ExecutorService 也是一個(gè)接口,繼承了 Executor 接口,增加了更多方法,相當(dāng)于擴(kuò)展了 Executor 接口的功能,例如定義了 submit() 系列方法,支持任務(wù)執(zhí)行后得到返回結(jié)果。
(三)AbstractExecutorService 抽象類。
public abstract class AbstractExecutorService implements ExecutorService {
// ... ...
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// ... ...
}
AbstractExecutorService 是一個(gè)抽象類,實(shí)現(xiàn)了 ExecutorService 接口中的部分方法,例如提供了任務(wù)提交的 submit 方法的默認(rèn)實(shí)現(xiàn),而 submit 方法最終會調(diào)用 execute 方法。
不過 AbstractExecutorService 并沒有實(shí)現(xiàn) execute 方法,相當(dāng)于為子類留了個(gè)口子,讓子類去靈活擴(kuò)展(鉤子函數(shù))。
(四)ScheduledExecutorService 接口。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
ScheduledExecutorService 接口繼承了 ExecutorService,增加定時(shí)調(diào)度的方法,使其成為一個(gè)可定時(shí)調(diào)度任務(wù)的接口,相當(dāng)于擴(kuò)展了 ExecutorService 的功能。
(五)ScheduledThreadPoolExecutor 類。
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
// ... ...
}
ScheduledThreadPoolExecutor 類繼承自 ThreadPoolExecutor 類,并且實(shí)現(xiàn)了 ScheduledExecutorService 接口,變成一個(gè)可定時(shí)調(diào)度任務(wù)的線程池。
(六)ThreadPoolExecutor 類。
public class ThreadPoolExecutor extends AbstractExecutorService {
// ... ...
}
ThreadPoolExecutor 繼承 AbstractExecutorService 抽象類,并實(shí)現(xiàn)了 execute 等一系列方法。
(七)Executors 類。
public class Executors {
// ... ...
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// ... ...
}
研發(fā)人員可以通過 Executors 工廠類來創(chuàng)建線程池并返回一個(gè)ExecutorService 對象,而內(nèi)部幾乎全是對 ThreadPoolExecutor 的封裝。
通過 Executor 的家族簡單認(rèn)識,應(yīng)該能感覺到 ThreadPoolExecutor 類的重要性,所以接下來要重點(diǎn)對 ThreadPoolExecutor 類的源碼進(jìn)行剖析。
源碼解讀:線程池狀態(tài)以及狀態(tài)流轉(zhuǎn)
上面注釋截圖來源于 ThreadPoolExecutor 的源碼,別懵圈,仔細(xì)看差不多都能懂,能夠看出線程池的五種狀態(tài)以及對應(yīng)的狀態(tài)流轉(zhuǎn)。
不知道你能看懂多少,看不懂也沒關(guān)系,接下來把上面的注釋用圖呈現(xiàn)給大家。通過源碼中的注釋,能夠勾勒出如下線程池的狀態(tài)流轉(zhuǎn)圖(好的注釋是多么的重要啊,感嘆號!)。
源碼解讀:部分成員變量及方法
/**
* ctl 是一個(gè) AtomicInteger 類型的原子對象。
* 其實(shí)設(shè)計(jì)很有意思:ctl 共包括 32 位(高 3 位表示"線程池狀態(tài)",低 29 位表示"線程池中的線程數(shù)量")。
* 個(gè)人感覺:線程池狀態(tài)與線程數(shù)量合二為一,用一個(gè)變量來表示,來減少鎖競爭,提高并發(fā)效率。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 表示線程池線程數(shù)的位數(shù):32 - 3 = 29 位 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 表示最大線程容量(000,11111111111111111111111111111)*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits(運(yùn)行狀態(tài)保存在 int 值的高 3 位)
/** 111,00000000000000000000000000000 */
private static final int RUNNING = -1 << COUNT_BITS;
/** 000,00000000000000000000000000000 */
private static final int SHUTDOWN = 0 << COUNT_BITS;
/** 001,00000000000000000000000000000 */
private static final int STOP = 1 << COUNT_BITS;
/** 010,00000000000000000000000000000 */
private static final int TIDYING = 2 << COUNT_BITS;
/** 011,00000000000000000000000000000 */
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
/** 獲取線程池的運(yùn)行狀態(tài) */
private static int runStateOf(int c) { return c & ~CAPACITY; }
/** 線程池內(nèi)有效線程的數(shù)量 (workerCount) */
private static int workerCountOf(int c) { return c & CAPACITY; }
/** 線程池的狀態(tài)和線程的數(shù)量組裝,成為 ctl */
private static int ctlOf(int rs, int wc) { return rs | wc; }
仔細(xì)去看上面的代碼,注釋已經(jīng)很清晰啦。重點(diǎn)關(guān)注 ctl 變量,這個(gè)變量將線程池自身狀態(tài)和線程數(shù)量,融合在這一個(gè)變量中,其中高 3 位表示線程池狀態(tài),低 29 位表示線程池中的線程數(shù)量,這樣在多線程環(huán)境下更易保證線程池自身狀態(tài)和線程數(shù)量的統(tǒng)一,不得不佩服源代碼作者 Doug Lea,可謂是設(shè)計(jì)甚妙!
源碼解讀:任務(wù)提交 submit 方法背后
疑問?當(dāng)調(diào)用 submit() 方法,把一個(gè)任務(wù)提交給線程池去處理的時(shí)候,線程池的處理過程是什么樣的呢?
通過開篇對 Executor 的家族簡介,能夠看到 submit() 方法最終會調(diào)用 ThreadPoolExecutor 的 execute 方法,走進(jìn)源碼好好看看 execute 方法都做了啥?
重點(diǎn)關(guān)注源碼中的注釋(紅框圈住部分),若看懂注釋,提交任務(wù)時(shí)線程池對應(yīng)的處理,也就懂了一半啦(感觸:好的編碼規(guī)范真的好重要,業(yè)務(wù)開發(fā)時(shí),核心代碼一定要有注釋)。
若依然很懵逼,一圖勝千言,那就繼續(xù)上圖吧。
了解上圖的整體流程,再去看看源碼就徹悟啦。
public void execute(Runnable command) {
//【Step 0. 如果任務(wù)為空則拋出 NPE 異常】
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//【Step 1. 判斷核心線程是否已滿】
// 1.1. 判斷當(dāng)前線程數(shù)是否已經(jīng)達(dá)到核心線程數(shù)的限制
if (workerCountOf(c) < corePoolSize) {
// 1.2. 如果未達(dá)到核心線程數(shù)的限制,則會直接添加一個(gè)核心線程,并指定首次執(zhí)行的任務(wù),進(jìn)行任務(wù)處理
if (addWorker(command, true))
return;
// 1.3. 如果添加失敗,則刷新線程池的狀態(tài)和線程的數(shù)量對應(yīng)的變量 ctl
c = ctl.get();
}
//【Step 2. 判斷阻塞隊(duì)列是否已滿】
// 2.1. 檢查線程池是否是運(yùn)行狀態(tài),然后將任務(wù)添加到等待隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
// 2.2. 任務(wù)成功添加到等待隊(duì)列,再次刷新 ctl
int recheck = ctl.get();
// 2.3. 添加任務(wù)到等待隊(duì)列成功后,如果線程池不是運(yùn)行狀態(tài),則將剛添加的任務(wù)從隊(duì)列移除并執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.4. 判斷當(dāng)前線程數(shù)量,如果線程數(shù)量為 0,則添加一個(gè)非核心線程,并且不指定首次執(zhí)行任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//【Step 3. 判斷最大線程數(shù)量是否已經(jīng)達(dá)到】
// 3.1. 添加非核心線程,指定首次執(zhí)行任務(wù),如果添加失敗,執(zhí)行異常策略
else if (!addWorker(command, false))
reject(command);
}
結(jié)合注釋去讀代碼,應(yīng)該都能搞懂。很顯然 execute 方法中,多處都調(diào)用了 addWorker 方法,接下來簡單剖析一下 addWorker 方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// ... ...
for (;;) {
// ... ...
// 通過 CAS 自旋,增加線程數(shù) +1,增加成功跳出雙層循環(huán),繼續(xù)往下執(zhí)行
if (compareAndIncrementWorkerCount(c))
break retry;
// ... ...
}
}
// 到這兒,說明已經(jīng)成功的將線程數(shù) +1 了,但是真正的線程還沒有被添加
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 添加線程,Worker 是繼承了 AQS,實(shí)現(xiàn)了 Runnable 接口的包裝類
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// ... ...
// 添加新增的 Worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
// ... ...
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng) Worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
為了簡明扼要,方法酌情進(jìn)行了刪減。addWorker 方法主要是通過雙重 for 循環(huán)進(jìn)行線程數(shù) +1,然后創(chuàng)建 Worker,并進(jìn)行添加到 HashSet<Worker> workers 列表中,然后調(diào)用 t.start() 啟動(dòng) Worker。
那么接下來再一起看看 Worker 里面都做了啥?
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// ... ...
final Thread thread;
Runnable firstTask;
/**
* 通過指定的 firstTask 任務(wù)創(chuàng)建 Worker 對象
*/
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 通過當(dāng)前 Worker 對象創(chuàng)建對應(yīng)的線程對象 t,
// 所以調(diào)用 t.start() 時(shí)最終會調(diào)用 Worker 的 run 方法
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// run 方法最終會調(diào)用 ThreadPoolExecutor 的 runWorker 方法
runWorker(this);
}
// ... ...
}
通過 Worker 的構(gòu)造函數(shù)能夠了解到,會通過創(chuàng)建的 Worker 對象去構(gòu)建線程對象,當(dāng)線程對象啟動(dòng)時(shí)最終會調(diào)用 runWorker 方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 取出需要執(zhí)行的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果 task 不是 null 或者去 workQueue 隊(duì)列中取到待執(zhí)行的任務(wù)不為 null
while (task != null || (task = getTask()) != null) {
// ... ...
try {
// 開始執(zhí)行任務(wù)前的鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
// ... ...
} finally {
// 任務(wù)執(zhí)行后的鉤子方法
afterExecute(task, thrown);
}
} finally {
// ... ...
}
}
completedAbruptly = false;
} finally {
// Worker 退出
processWorkerExit(w, completedAbruptly);
}
}
runWorker 方法,首先會取出要執(zhí)行的任務(wù) task,如果為空則會調(diào)用 getTask 方法從任務(wù)隊(duì)列中獲取,然后調(diào)用任務(wù)對應(yīng)的 run 方法進(jìn)行執(zhí)行,另外預(yù)置了 beforeExecute、afterExecute 兩個(gè)鉤子函數(shù),讓研發(fā)人員監(jiān)控線程執(zhí)行成為可能。
另外,線程池中的線程如何從隊(duì)列中獲取待執(zhí)行的任務(wù)的呢?走進(jìn) getTask 方法看一看。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 這塊體現(xiàn)了:線程池的線程是復(fù)用的,通過循環(huán)去獲取隊(duì)列中的任務(wù)去執(zhí)行。
for (;;) {
int c = ctl.get();
// ... ...
int wc = workerCountOf(c);
// allowCoreThreadTimeOut: 是否允許核心線程超時(shí).
// 如果設(shè)置為 false,那么線程池在達(dá)到 corePoolSize 個(gè)工作線程之前,不會讓閑置的工作線程退出。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// ... ...
try {
// 從 workQueue 隊(duì)列中取待執(zhí)行的任務(wù),根據(jù) timed 來選擇等待時(shí)間
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
為了便于理解,源碼做了部分刪減。重點(diǎn)關(guān)注從任務(wù)隊(duì)列中獲取待執(zhí)行任務(wù)的對象的方法調(diào)用:workQueue.poll()、workQueue.take() ,前者是移除并返回隊(duì)列中的頭部元素,如果隊(duì)列為空,則返回 null,而后者是移除并返回隊(duì)列中的頭部元素,如果隊(duì)列為空,則阻塞。
煙未滅,酒過半 ... ... 源碼探討就談到這兒... ...
源碼揭秘之后的反思
(一)鉤子函數(shù)的使用場景
場景一:
如上面自定義的 MyThreadPoolExecutor,可以讓日志打印線程及線程數(shù)等等信息。意味著研發(fā)人員可以擴(kuò)展 ThreadPoolExecutor,對鉤子函數(shù) beforeExecute、afterExecute 進(jìn)行實(shí)現(xiàn),進(jìn)而可以知曉線程池內(nèi)部的調(diào)度細(xì)節(jié),可以有效進(jìn)行監(jiān)控,針對故障排查應(yīng)該很有幫助。
場景二:
AbstractExecutorService 并沒有實(shí)現(xiàn) execute 方法,而是為子類 ThreadPoolExecutor 留了個(gè)口子,讓子類去靈活擴(kuò)展(鉤子函數(shù))。
仔細(xì)想想業(yè)務(wù)開發(fā)時(shí),諸多的使用場景,何嘗不是如此呢?
(二)線程池的 submit 方法與 execute 方法啥區(qū)別呢?
execute 方法,適用于不需要關(guān)注返回值的場景,只需要將線程丟到線程池中去執(zhí)行就可以了。
而 submit() 方法,適用于需要關(guān)注返回值的場景,不過最終會調(diào)用 execute() 方法。
考慮到性能提升,如果不需要關(guān)注返回值,則建議直接調(diào)用 execute() 方法,因?yàn)槟菢訒帘魏芏嘀虚g調(diào)度。
(三)線程池狀態(tài)與線程數(shù)量用一個(gè) ctl 變量表示的好處?
線程池狀態(tài)和線程數(shù)量合二為一,用一個(gè)原子變量來表示,來減少鎖競爭,提高并發(fā)效率。
(四)清晰的注釋是否有必要?
通過探秘源碼,很多圖都是根據(jù)源碼注釋勾勒出來的。可以看出清晰的注釋,對于核心流程而言真的很重要,一定要養(yǎng)成良好的編碼習(xí)慣,關(guān)鍵業(yè)務(wù)邏輯、核心流程,建議一定要寫好注釋,利人又利己,何樂而不為之。
(五)Executor 家族框架,若寫基礎(chǔ)框架時(shí),是否有借鑒意義呢?
個(gè)人感覺很有借鑒意義,因?yàn)闊o論業(yè)務(wù)開發(fā)還是基礎(chǔ)服務(wù),總會看到類似模式框架的身影,總會有大牛模仿著造輪子,所以閑暇之余可以抽象一下。
寄語寫最后
本次,主要對 Executor 家族進(jìn)行了簡單介紹,并著重對線程池背后的 ThreadPoolExecutor 類進(jìn)行深度剖析,知其然知其所以然,希望對大家有幫助。