日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

前言

大家好,我是jack xu,本篇是并發(fā)編程的第二篇,今天跟大家聊一聊線程池的那點(diǎn)事。本篇文章有點(diǎn)長,小伙們靜下心、耐下心來把他看完。。

為什么要使用線程池

1)降低創(chuàng)建線程和銷毀線程的性能開銷

2)提高響應(yīng)速度,當(dāng)有新任務(wù)需要執(zhí)行是不需要等待線程創(chuàng)建就可以立馬執(zhí)行

3)合理的設(shè)置線程池大小可以避免因?yàn)榫€程數(shù)超過硬件資源瓶頸帶來的問題

 

從根兒上認(rèn)識線程池

 

我們來看阿里巴巴的代碼規(guī)范,在項(xiàng)目中創(chuàng)建線程必須要使用線程池創(chuàng)建,原因就是我說的以上三點(diǎn)

 

線程池的使用

首先我們來看下UML類圖

 

從根兒上認(rèn)識線程池

 

 

  • Executor:可以看到最頂層是 Executor 的接口。這個接口很簡單,只有一個 execute 方法。此接口的目的是為了把任務(wù)提交和任務(wù)執(zhí)行解耦。
  • ExecutorService:這還是一個接口,繼承自 Executor,它擴(kuò)展了 Executor 接口,定義了更多線程池相關(guān)的操作。
  • AbstractExecutorService:提供了 ExecutorService 的部分默認(rèn)實(shí)現(xiàn)。
  • ThreadPoolExecutor:實(shí)際上我們使用的線程池的實(shí)現(xiàn)是 ThreadPoolExecutor。它實(shí)現(xiàn)了線程池工作的完整機(jī)制。也是我們接下來分析的重點(diǎn)對象。
  • ForkJoinPool:和ThreadPoolExecutor都繼承自AbstractExecutorService,適合用于分而治之,遞歸計算的算法
  • ScheduledExecutorService:這個接口擴(kuò)展了ExecutorService,定義個延遲執(zhí)行和周期性執(zhí)行任務(wù)的方法。
  • ScheduledThreadPoolExecutor:此接口則是在繼承 ThreadPoolExecutor 的基礎(chǔ)上實(shí)現(xiàn) ScheduledExecutorService 接口,提供定時和周期執(zhí)行任務(wù)的特性。

搞清楚上面的結(jié)構(gòu)很重要,Executors是一個工具類,然后看創(chuàng)建線程的兩種方式,第一種是通過Executors提供的工廠方法來實(shí)現(xiàn),有下面四種方式

        Executor executor1 = Executors.newFixedThreadPool(10);
        Executor executor2 = Executors.newSingleThreadExecutor();
        Executor executor3 = Executors.newCachedThreadPool();
        Executor executor4 = Executors.newScheduledThreadPool(10);
復(fù)制代碼

第二種是通過構(gòu)造方法來實(shí)現(xiàn)

        ExecutorService executor5 = new ThreadPoolExecutor(1,
                1,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
復(fù)制代碼

其實(shí)查看第一種方式創(chuàng)建的源碼就會發(fā)現(xiàn):

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
復(fù)制代碼

根本上還是通過調(diào)用ThreadPoolExecutor的構(gòu)造方法,創(chuàng)建時傳入不同參數(shù),所以本質(zhì)上還是只有一種創(chuàng)建線程池的方式,就是用構(gòu)造方法,這里我不想講用Executors的工廠方法具體幫我們創(chuàng)建了怎樣的線程池,讓我們再來看一條阿里巴巴規(guī)范。

 

從根兒上認(rèn)識線程池

 

看到這里大家都明白了吧,正是因?yàn)榉庋b性太強(qiáng)了,反而小伙們會不知道怎么用,亂用,濫用,有可能會導(dǎo)致OOM,除非你對創(chuàng)建的這四個線程池了如指掌,所以我介紹了也是白介紹,因?yàn)榫筒辉谟茫酉聛砦覀冎攸c(diǎn)看下ThreadPoolExecutor構(gòu)造方法里各個參數(shù)的含義,構(gòu)造方法有很多個,我選了一個最完整的。

 

public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)量
                          int maximumPoolSize, //最大線程數(shù)
                          long keepAliveTime, //超時時間,超出核心線程數(shù)量以外的線程空余存活時間
                          TimeUnit unit, //存活時間單位
                          BlockingQueue<Runnable> workQueue, //保存執(zhí)行任務(wù)的隊(duì)列
                          ThreadFactory threadFactory,//創(chuàng)建新線程使用的工廠
                          RejectedExecutionHandler handler //當(dāng)任務(wù)無法執(zhí)行的時候的處理方式)
復(fù)制代碼
  • corePoolSize:即線程池的核心線程數(shù)量,其實(shí)也是最小線程數(shù)量。不設(shè)置allowCoreThreadTimeOut 的情況下,核心線程數(shù)量范圍內(nèi)的線程一直存活。線程不會自行銷毀,而是以掛起的狀態(tài)返回到線程池,直到應(yīng)用程序再次向線程池發(fā)出請求時,線程池里掛起的線程就會再度激活執(zhí)行任務(wù)。
  • maximumPoolSize:即線程池的最大線程數(shù)量
  • keepAliveTime和unit:超出核心線程數(shù)后的存活時間和單位
  • workQueue:是一個阻塞的 queue,用來保存線程池要執(zhí)行的所有任務(wù)。通常可以取下面三種類型:
1)ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時必須指定大小;  
2)LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
3)SynchronousQueue:這個隊(duì)列比較特殊,它不會保存提交的任務(wù),而是將直接新建一個線程來執(zhí)行新來的任務(wù)。
復(fù)制代碼
  • ThreadFactory:我們一般用Executors.defaultThreadFactory()默認(rèn)工廠,為什么要用工廠呢,其實(shí)就是規(guī)范了生成的Thread。避免調(diào)用new Thread創(chuàng)建,導(dǎo)致創(chuàng)建出來的Thread可能存在差異
  • handler:當(dāng)隊(duì)列和最大線程池都滿了之后的飽和策略。
1、AbortPolicy:直接拋出異常,默認(rèn)策略;
2、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
3、DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、DiscardPolicy:直接丟棄任務(wù);
當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn) RejectedExecutionHandler 接口,自定義飽和策略,如記錄
日志或持久化存儲不能處理的任務(wù)
復(fù)制代碼

創(chuàng)建完線程池后使用也很簡單,帶返回值和不帶返回值,傳入對應(yīng)傳入Runnable或者Callable接口的實(shí)現(xiàn)

        //無返回值
        executor5.execute(() -> System.out.println("jack xushuaige"));
        //帶返回值
        String message = executor5.submit(() -> { return "jack xushuaige"; }).get();
復(fù)制代碼

源碼分析

execute方法

基于源碼入口進(jìn)行分析,先看execute方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
復(fù)制代碼

源代碼中有一段關(guān)鍵的注釋我沒有貼進(jìn)來,下面我先把這段關(guān)鍵的注釋翻譯講解下:

分三步做處理:

1、如果運(yùn)行的線程數(shù)量小于 corePoolSize,那么嘗試創(chuàng)建新的線程,并把傳入的 command 作為它的第一個 task 來執(zhí)行。調(diào)用 addWorker 會自動檢查 runState 和 workCount,以此來防止在不應(yīng)該添加線程時添加線程的錯誤警告;

2、即使 task 可以被成功加入隊(duì)列,我們?nèi)耘f需要再次確認(rèn)我們是否應(yīng)該添加 thread(因?yàn)樽詈笠淮螜z查之后可能有線程已經(jīng)死掉了)還是線程池在進(jìn)入此方法后已經(jīng)停掉了。所以我們會再次檢查狀態(tài),如果有必要的話,可以回滾隊(duì)列。或者當(dāng)沒有線程時,開啟新的 thread;

3、如果無法將 task 加入 queue,那么可以嘗試添加新的 thread。如果添加失敗,這是因?yàn)榫€程池被關(guān)閉或者已經(jīng)飽和了,所以拒絕這個 task。

如果你看完以后還是一臉懵逼,那沒事,我把這個流程圖畫下來,你品,你細(xì)品,好好理解一下

從根兒上認(rèn)識線程池

 

 

然后介紹一下源碼中ctl是干什么的,點(diǎn)進(jìn)去查看源碼

 

從根兒上認(rèn)識線程池

 

我們發(fā)現(xiàn)它是一個原子類,主要作用是用來保存線程數(shù)量和線程池的狀態(tài),他用到了位運(yùn)算, 一個int數(shù)值是32個 bit 位,這里采用高 3 位來保存運(yùn)行狀態(tài),低 29 位來保存線程數(shù)量。

 

我們來計算一下ctlOf(RUNNING, 0)方法,其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位,-1 的二進(jìn)制是32個1(1111 1111 1111 1111 1111 1111 1111 1111),左移29位后得到(1110 0000 0000 0000 0000 0000 0000 0000),然后111| 0還是111,同理可得其他狀態(tài)的 bit 位。這個位運(yùn)算很有意思,hashmap源碼中也用到了位運(yùn)算,小伙們在平時開發(fā)中也可以嘗試用下,這樣運(yùn)算速度會快,而且能夠裝b,介紹下這五種線程池的狀態(tài)

  • RUNNING:接收新任務(wù),并執(zhí)行隊(duì)列中的任務(wù)
  • SHUTDOWN:不接收新任務(wù),但是執(zhí)行隊(duì)列中的任務(wù)
  • STOP:不接收新任務(wù),不執(zhí)行隊(duì)列中的任務(wù),中斷正在執(zhí)行中的任務(wù)
  • TIDYING:所有的任務(wù)都已結(jié)束, 線程數(shù)量為 0,處于該狀態(tài)的線程池即將調(diào)用 terminated()方法
  • TERMINATED:terminated()方法執(zhí)行完成

他們的轉(zhuǎn)換關(guān)系如下:

 

從根兒上認(rèn)識線程池

 

 

addWorker方法

我們看到execute流程的核心方法為addWorker,我們繼續(xù)分析,源碼看起來比較唬人,其實(shí)就做了兩件事,拆分一下

第一步:更新worker的數(shù)量,代碼如下:

retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
復(fù)制代碼

retry是一個標(biāo)記,和循環(huán)配合使用,continue retry 的時候,會跳到 retry 的地方再次執(zhí)行。如果 break retry,則跳出整個循環(huán)體。源碼先獲取到 ctl,然后檢查狀態(tài),然后根據(jù)創(chuàng)建線程類型的不同,進(jìn)行數(shù)量的校驗(yàn)。在通過CAS方式更新 ctl狀態(tài),成功的話則跳出循環(huán)。否則再次取得線程池狀態(tài),如果和最初已經(jīng)不一致,那么從頭開始執(zhí)行。如果狀態(tài)并未改變則繼續(xù)更新worker的數(shù)量。流程圖如下:

 

從根兒上認(rèn)識線程池

 

第二步:添加 worker 到 workers 的 set 中。并且啟動 worker 中持有的線程。代碼如下:

 

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
復(fù)制代碼

可以看到添加 work 時需要先獲得鎖,這樣確保多線程并發(fā)安全。如果添加 worker 成功,那么調(diào)用 worker 中線程的 start 方法啟動線程。如果啟動失敗則調(diào)用 addWorkerFailed 方法進(jìn)行回滾。看到這里小伙們會發(fā)現(xiàn)

1、ThreadPoolExecutor在初始化后并沒有啟動和創(chuàng)建任何線程,在調(diào)用 execute方法時才會調(diào)用 addWorker創(chuàng)建線程

2、addWorker方法中會創(chuàng)建新的worker,并啟動其持有的線程來執(zhí)行任務(wù)。

上文提到如果線程數(shù)量已經(jīng)達(dá)到corePoolSize,則只會把command 加入到 workQueue中,那么加入到 workQueue中的command是如何被執(zhí)行的呢?下面我們來分析 Worker 的源代碼。

Worker類

Worker封裝了線程,是executor中的工作單元。worker繼承自AbstractQueuedSynchronizer,并實(shí)現(xiàn) Runnable。 Worker 簡單理解其實(shí)就是一個線程,里面重新了 run 方法,我們來看他的構(gòu)造方法:

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
復(fù)制代碼

再來看下這兩個重要的屬性

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
復(fù)制代碼

firstTask 用它來保存?zhèn)魅氲娜蝿?wù);thread 是在調(diào)用構(gòu)造方法時通過 ThreadFactory 來創(chuàng)建的線程,是用來處理任務(wù)的線程,這里用的是 ThreadFactory 創(chuàng)建線程,并沒有直接 new,原因上文也提到過,這里看下 newThread 傳入的是 this,因?yàn)?Worker 本身繼承了 Runnable 接口,所以 addWork 中調(diào)用的 t.start(),實(shí)際上運(yùn)行的是 t 所屬 worker 的 run 方法。worker 的 run 方法如下:

public void run() {
    runWorker(this);
}
復(fù)制代碼

runWorker源碼再如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
復(fù)制代碼

簡單分析一下

1、先取出 worker 中的 firstTask,并清空;

2、如果沒有 firstTask,則調(diào)用 getTask 方法,從 workQueue 中獲取task;

3、獲取鎖;

4、執(zhí)行 beforeExecute。這里是空方法,如有需要在子類實(shí)現(xiàn);

5、執(zhí)行 task.run;

6、執(zhí)行 afterExecute。這里是空方法,如有需要在子類實(shí)現(xiàn);

7、清空 task,completedTasks++,釋放鎖;

8、當(dāng)有異常或者沒有 task 可執(zhí)行時,會進(jìn)入外層 finnaly 代碼塊。調(diào)用 processWorkerExit 退出當(dāng)前 worker。從 works 中移除本 worker 后,如果 worker 數(shù)量小于 corePoolSize,則創(chuàng)建新的 worker,以維持 corePoolSize 大小的線程數(shù)。

這行代碼 while (task != null || (task = getTask()) != null) ,確保了 worker 不停地從 workQueue 中取得 task 執(zhí)行。getTask 方法會從 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出來。

至此,關(guān)于 executor 如何創(chuàng)建并啟動線程執(zhí)行 task 的過程已經(jīng)分析的清清楚楚,明明白白,后面還有shutdown()、shutdownNow()等其他方法留給小伙們自行去觀察研究哈。

如何合理配置線程池的大小

線程池大小不是靠猜,也不是說越多越好。

  • CPU 密集型任務(wù):主要是執(zhí)行計算任務(wù),響應(yīng)時間很快,CPU 一直在運(yùn)行,這種任務(wù) CPU 的利用率很高,那么線程數(shù)的配置應(yīng)該根據(jù) CPU 核心數(shù)來決定,應(yīng)當(dāng)分配較少的線程,比如和 CPU 個數(shù)相當(dāng)?shù)拇笮 ?/li>
  • IO 密集型任務(wù):主要是進(jìn)行 IO 操作,執(zhí)行 IO 操作的時間較長,由于線程并不是一直在運(yùn)行,這時 CPU 處于空閑狀態(tài), 這種情況下可以增加線程池的大小,比如 CPU 個數(shù) * 2

當(dāng)然這些都是經(jīng)驗(yàn)值,最好的方式還是根據(jù)實(shí)際情況測試得出最佳配置。

線程池的監(jiān)控

如果在項(xiàng)目中大規(guī)模的使用了線程池,那么必須要有一套監(jiān)控體系,來指導(dǎo)當(dāng)前線程池的狀 態(tài),當(dāng)出現(xiàn)問題的時候可以快速定位到問題。我們通過重寫線程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以實(shí)現(xiàn)對線程的監(jiān)控

 

從根兒上認(rèn)識線程池

 

看這些名稱和定義都知道,這是讓子類來實(shí)現(xiàn)的,可以在線程執(zhí)行前、后、終止?fàn)顟B(tài)執(zhí)行自定義邏輯。

 

總結(jié)

線程池這東西說簡單也簡單,說難也難,簡單是因?yàn)橛闷饋砗唵危孕』飩兛赡苡X得這有啥好講的,難是難在要知道他的底層的源碼,他是如何調(diào)度線程的,說兩點(diǎn)吧,第一是本文中用了大量的流程圖,當(dāng)我們在閱讀源碼或者做復(fù)雜業(yè)務(wù)開發(fā)的時候,一定要靜下心來先畫個圖,否則會被繞暈或者被別人打斷后,又得從頭到尾的看一邊,第二是閱讀源碼,剛畢業(yè)的小伙伴可能只要會用行了,但是如果你工作五年了,還是只會用,而不知道他里面是如何實(shí)現(xiàn)的,那你比剛畢業(yè)的優(yōu)勢在哪里,憑什么工資比剛畢業(yè)的高。如果你覺得寫的不錯,請點(diǎn)一個贊!

分享到:
標(biāo)簽:線程
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定