日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

作為 JAVA 程序員,無論是技術(shù)面試、項(xiàng)目研發(fā)或者是學(xué)習(xí)框架源碼,不徹底掌握 Java 多線程的知識,做不到心中有數(shù),干啥都沒底氣,尤其是技術(shù)深究時(shí)往往略顯發(fā)憷。

坐穩(wěn)扶好,通過今天的分享,能讓你輕松 get 如下幾點(diǎn)。

1. Executor 框架家族簡介;

2. 源碼解讀:線程池狀態(tài)以及狀態(tài)流轉(zhuǎn);

3. 源碼解讀:部分成員變量及方法;

4. 源碼解讀:任務(wù)提交submit方法背后;

5. 源碼揭秘之后的反思;

6. 寄語。

Executor 家族簡介

一圖勝千言,腦中有圖心不慌。

Java線程池深度揭秘

executor 家族簡圖

(一)Executor 接口。

public interface Executor {
    void execute(Runnable command);
}

Executor 是一個(gè)接口(主要用于定義規(guī)范),定義了 execute 方法,用于接收 Runnable 對象。

(二)ExecutorService 接口。

public interface ExecutorService extends Executor {
    // ... ...
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    // ... ...
}

ExecutorService 也是一個(gè)接口,繼承了 Executor 接口,增加了更多方法,相當(dāng)于擴(kuò)展了 Executor 接口的功能,例如定義了 submit() 系列方法,支持任務(wù)執(zhí)行后得到返回結(jié)果。

(三)AbstractExecutorService 抽象類。

public abstract class AbstractExecutorService implements ExecutorService {
    // ... ...
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    // ... ...
}

AbstractExecutorService 是一個(gè)抽象類,實(shí)現(xiàn)了 ExecutorService 接口中的部分方法,例如提供了任務(wù)提交的 submit 方法的默認(rèn)實(shí)現(xiàn),而 submit 方法最終會調(diào)用 execute 方法。

不過 AbstractExecutorService 并沒有實(shí)現(xiàn) execute 方法,相當(dāng)于為子類留了個(gè)口子,讓子類去靈活擴(kuò)展(鉤子函數(shù))。

(四)ScheduledExecutorService 接口。

public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

ScheduledExecutorService 接口繼承了 ExecutorService,增加定時(shí)調(diào)度的方法,使其成為一個(gè)可定時(shí)調(diào)度任務(wù)的接口,相當(dāng)于擴(kuò)展了 ExecutorService 的功能。

(五)ScheduledThreadPoolExecutor 類。

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    // ... ...
}

ScheduledThreadPoolExecutor 類繼承自 ThreadPoolExecutor 類,并且實(shí)現(xiàn)了 ScheduledExecutorService 接口,變成一個(gè)可定時(shí)調(diào)度任務(wù)的線程池。

(六)ThreadPoolExecutor 類。

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ... ...
}

ThreadPoolExecutor 繼承 AbstractExecutorService 抽象類,并實(shí)現(xiàn)了 execute 等一系列方法。

(七)Executors 類。

public class Executors {
    // ... ...
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }    
    // ... ...
}

研發(fā)人員可以通過 Executors 工廠類來創(chuàng)建線程池并返回一個(gè)ExecutorService 對象,而內(nèi)部幾乎全是對 ThreadPoolExecutor 的封裝。

通過 Executor 的家族簡單認(rèn)識,應(yīng)該能感覺到 ThreadPoolExecutor 類的重要性,所以接下來要重點(diǎn)對 ThreadPoolExecutor 類的源碼進(jìn)行剖析。

源碼解讀:線程池狀態(tài)以及狀態(tài)流轉(zhuǎn)

Java線程池深度揭秘

 

上面注釋截圖來源于 ThreadPoolExecutor 的源碼,別懵圈,仔細(xì)看差不多都能懂,能夠看出線程池的五種狀態(tài)以及對應(yīng)的狀態(tài)流轉(zhuǎn)。

不知道你能看懂多少,看不懂也沒關(guān)系,接下來把上面的注釋用圖呈現(xiàn)給大家。通過源碼中的注釋,能夠勾勒出如下線程池的狀態(tài)流轉(zhuǎn)圖(好的注釋是多么的重要啊,感嘆號!)。

Java線程池深度揭秘

 

源碼解讀:部分成員變量及方法

/**
 * ctl 是一個(gè) AtomicInteger 類型的原子對象。
 * 其實(shí)設(shè)計(jì)很有意思:ctl 共包括 32 位(高 3 位表示"線程池狀態(tài)",低 29 位表示"線程池中的線程數(shù)量")。
 * 個(gè)人感覺:線程池狀態(tài)與線程數(shù)量合二為一,用一個(gè)變量來表示,來減少鎖競爭,提高并發(fā)效率。
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 表示線程池線程數(shù)的位數(shù):32 - 3 = 29 位 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 表示最大線程容量(000,11111111111111111111111111111)*/
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits(運(yùn)行狀態(tài)保存在 int 值的高 3 位)
/** 111,00000000000000000000000000000 */
private static final int RUNNING    = -1 << COUNT_BITS;
/** 000,00000000000000000000000000000 */
private static final int SHUTDOWN   =  0 << COUNT_BITS;
/** 001,00000000000000000000000000000 */
private static final int STOP       =  1 << COUNT_BITS;
/** 010,00000000000000000000000000000 */
private static final int TIDYING    =  2 << COUNT_BITS;
/** 011,00000000000000000000000000000 */
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
/** 獲取線程池的運(yùn)行狀態(tài) */
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/** 線程池內(nèi)有效線程的數(shù)量 (workerCount) */
private static int workerCountOf(int c)  { return c & CAPACITY; }
/** 線程池的狀態(tài)和線程的數(shù)量組裝,成為 ctl */
private static int ctlOf(int rs, int wc) { return rs | wc; }

仔細(xì)去看上面的代碼,注釋已經(jīng)很清晰啦。重點(diǎn)關(guān)注 ctl 變量,這個(gè)變量將線程池自身狀態(tài)和線程數(shù)量,融合在這一個(gè)變量中,其中高 3 位表示線程池狀態(tài),低 29 位表示線程池中的線程數(shù)量,這樣在多線程環(huán)境下更易保證線程池自身狀態(tài)和線程數(shù)量的統(tǒng)一,不得不佩服源代碼作者 Doug Lea,可謂是設(shè)計(jì)甚妙!

源碼解讀:任務(wù)提交 submit 方法背后

疑問?當(dāng)調(diào)用 submit() 方法,把一個(gè)任務(wù)提交給線程池去處理的時(shí)候,線程池的處理過程是什么樣的呢?

通過開篇對 Executor 的家族簡介,能夠看到 submit() 方法最終會調(diào)用 ThreadPoolExecutor 的 execute 方法,走進(jìn)源碼好好看看 execute 方法都做了啥?

Java線程池深度揭秘

 

重點(diǎn)關(guān)注源碼中的注釋(紅框圈住部分),若看懂注釋,提交任務(wù)時(shí)線程池對應(yīng)的處理,也就懂了一半啦(感觸:好的編碼規(guī)范真的好重要,業(yè)務(wù)開發(fā)時(shí),核心代碼一定要有注釋)。

若依然很懵逼,一圖勝千言,那就繼續(xù)上圖吧。

Java線程池深度揭秘

 

了解上圖的整體流程,再去看看源碼就徹悟啦。

public void execute(Runnable command) {
    //【Step 0. 如果任務(wù)為空則拋出 NPE 異常】
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //【Step 1. 判斷核心線程是否已滿】
    // 1.1. 判斷當(dāng)前線程數(shù)是否已經(jīng)達(dá)到核心線程數(shù)的限制
    if (workerCountOf(c) < corePoolSize) {
        // 1.2. 如果未達(dá)到核心線程數(shù)的限制,則會直接添加一個(gè)核心線程,并指定首次執(zhí)行的任務(wù),進(jìn)行任務(wù)處理
        if (addWorker(command, true))
            return;
        // 1.3. 如果添加失敗,則刷新線程池的狀態(tài)和線程的數(shù)量對應(yīng)的變量 ctl
        c = ctl.get();
    }
    //【Step 2. 判斷阻塞隊(duì)列是否已滿】
    // 2.1. 檢查線程池是否是運(yùn)行狀態(tài),然后將任務(wù)添加到等待隊(duì)列
    if (isRunning(c) && workQueue.offer(command)) {
        // 2.2. 任務(wù)成功添加到等待隊(duì)列,再次刷新 ctl
        int recheck = ctl.get();
        // 2.3. 添加任務(wù)到等待隊(duì)列成功后,如果線程池不是運(yùn)行狀態(tài),則將剛添加的任務(wù)從隊(duì)列移除并執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.4. 判斷當(dāng)前線程數(shù)量,如果線程數(shù)量為 0,則添加一個(gè)非核心線程,并且不指定首次執(zhí)行任務(wù)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //【Step 3. 判斷最大線程數(shù)量是否已經(jīng)達(dá)到】
    // 3.1. 添加非核心線程,指定首次執(zhí)行任務(wù),如果添加失敗,執(zhí)行異常策略
    else if (!addWorker(command, false))
        reject(command);
}

結(jié)合注釋去讀代碼,應(yīng)該都能搞懂。很顯然 execute 方法中,多處都調(diào)用了 addWorker 方法,接下來簡單剖析一下 addWorker 方法。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // ... ...
        for (;;) {
            // ... ...
            // 通過 CAS 自旋,增加線程數(shù) +1,增加成功跳出雙層循環(huán),繼續(xù)往下執(zhí)行
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // ... ...
        }
    }
    // 到這兒,說明已經(jīng)成功的將線程數(shù) +1 了,但是真正的線程還沒有被添加
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 添加線程,Worker 是繼承了 AQS,實(shí)現(xiàn)了 Runnable 接口的包裝類
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // ... ...
                // 添加新增的 Worker
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
                // ... ...
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動(dòng) Worker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

為了簡明扼要,方法酌情進(jìn)行了刪減。addWorker 方法主要是通過雙重 for 循環(huán)進(jìn)行線程數(shù) +1,然后創(chuàng)建 Worker,并進(jìn)行添加到 HashSet<Worker> workers 列表中,然后調(diào)用 t.start() 啟動(dòng) Worker。

那么接下來再一起看看 Worker 里面都做了啥?

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    // ... ...
    final Thread thread;
    Runnable firstTask;

    /**
     * 通過指定的 firstTask 任務(wù)創(chuàng)建 Worker 對象
     */
    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        // 通過當(dāng)前 Worker 對象創(chuàng)建對應(yīng)的線程對象 t,
        // 所以調(diào)用 t.start() 時(shí)最終會調(diào)用 Worker 的 run 方法
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        // run 方法最終會調(diào)用 ThreadPoolExecutor 的 runWorker 方法
        runWorker(this);
    }
    // ... ...
}

通過 Worker 的構(gòu)造函數(shù)能夠了解到,會通過創(chuàng)建的 Worker 對象去構(gòu)建線程對象,當(dāng)線程對象啟動(dòng)時(shí)最終會調(diào)用 runWorker 方法。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 取出需要執(zhí)行的任務(wù)
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果 task 不是 null 或者去 workQueue 隊(duì)列中取到待執(zhí)行的任務(wù)不為 null
        while (task != null || (task = getTask()) != null) {
            // ... ...
            try {
                // 開始執(zhí)行任務(wù)前的鉤子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                    // ... ...
                } finally {
                    // 任務(wù)執(zhí)行后的鉤子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // ... ...
            }
        }
        completedAbruptly = false;
    } finally {
        // Worker 退出
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker 方法,首先會取出要執(zhí)行的任務(wù) task,如果為空則會調(diào)用 getTask 方法從任務(wù)隊(duì)列中獲取,然后調(diào)用任務(wù)對應(yīng)的 run 方法進(jìn)行執(zhí)行,另外預(yù)置了 beforeExecute、afterExecute 兩個(gè)鉤子函數(shù),讓研發(fā)人員監(jiān)控線程執(zhí)行成為可能。

另外,線程池中的線程如何從隊(duì)列中獲取待執(zhí)行的任務(wù)的呢?走進(jìn) getTask 方法看一看。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    // 這塊體現(xiàn)了:線程池的線程是復(fù)用的,通過循環(huán)去獲取隊(duì)列中的任務(wù)去執(zhí)行。
    for (;;) {
        int c = ctl.get();
        // ... ...
        int wc = workerCountOf(c);
        // allowCoreThreadTimeOut: 是否允許核心線程超時(shí).
        // 如果設(shè)置為 false,那么線程池在達(dá)到 corePoolSize 個(gè)工作線程之前,不會讓閑置的工作線程退出。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // ... ...
        try {
            // 從 workQueue 隊(duì)列中取待執(zhí)行的任務(wù),根據(jù) timed 來選擇等待時(shí)間
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

為了便于理解,源碼做了部分刪減。重點(diǎn)關(guān)注從任務(wù)隊(duì)列中獲取待執(zhí)行任務(wù)的對象的方法調(diào)用:workQueue.poll()、workQueue.take() ,前者是移除并返回隊(duì)列中的頭部元素,如果隊(duì)列為空,則返回 null,而后者是移除并返回隊(duì)列中的頭部元素,如果隊(duì)列為空,則阻塞。

煙未滅,酒過半 ... ... 源碼探討就談到這兒... ...

源碼揭秘之后的反思

(一)鉤子函數(shù)的使用場景

場景一:

Java線程池深度揭秘

 

如上面自定義的 MyThreadPoolExecutor,可以讓日志打印線程及線程數(shù)等等信息。意味著研發(fā)人員可以擴(kuò)展 ThreadPoolExecutor,對鉤子函數(shù) beforeExecute、afterExecute 進(jìn)行實(shí)現(xiàn),進(jìn)而可以知曉線程池內(nèi)部的調(diào)度細(xì)節(jié),可以有效進(jìn)行監(jiān)控,針對故障排查應(yīng)該很有幫助。

場景二:

AbstractExecutorService 并沒有實(shí)現(xiàn) execute 方法,而是為子類 ThreadPoolExecutor 留了個(gè)口子,讓子類去靈活擴(kuò)展(鉤子函數(shù))。

仔細(xì)想想業(yè)務(wù)開發(fā)時(shí),諸多的使用場景,何嘗不是如此呢?

(二)線程池的 submit 方法與 execute 方法啥區(qū)別呢?

execute 方法,適用于不需要關(guān)注返回值的場景,只需要將線程丟到線程池中去執(zhí)行就可以了。

而 submit() 方法,適用于需要關(guān)注返回值的場景,不過最終會調(diào)用 execute() 方法。

考慮到性能提升,如果不需要關(guān)注返回值,則建議直接調(diào)用 execute() 方法,因?yàn)槟菢訒帘魏芏嘀虚g調(diào)度。

(三)線程池狀態(tài)與線程數(shù)量用一個(gè) ctl 變量表示的好處?

線程池狀態(tài)和線程數(shù)量合二為一,用一個(gè)原子變量來表示,來減少鎖競爭,提高并發(fā)效率。

(四)清晰的注釋是否有必要?

通過探秘源碼,很多圖都是根據(jù)源碼注釋勾勒出來的。可以看出清晰的注釋,對于核心流程而言真的很重要,一定要養(yǎng)成良好的編碼習(xí)慣,關(guān)鍵業(yè)務(wù)邏輯、核心流程,建議一定要寫好注釋,利人又利己,何樂而不為之。

(五)Executor 家族框架,若寫基礎(chǔ)框架時(shí),是否有借鑒意義呢?

個(gè)人感覺很有借鑒意義,因?yàn)闊o論業(yè)務(wù)開發(fā)還是基礎(chǔ)服務(wù),總會看到類似模式框架的身影,總會有大牛模仿著造輪子,所以閑暇之余可以抽象一下。

寄語寫最后

本次,主要對 Executor 家族進(jìn)行了簡單介紹,并著重對線程池背后的 ThreadPoolExecutor 類進(jìn)行深度剖析,知其然知其所以然,希望對大家有幫助。

分享到:
標(biāo)簽:線程 Java
用戶無頭像

網(wǎng)友整理

注冊時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定