一、阻塞隊(duì)列簡(jiǎn)介
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者場(chǎng)景,生產(chǎn)者往往是往隊(duì)列里添加元素的線程,消費(fèi)者
是從隊(duì)列里拿元素的線程嗎,阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,是消費(fèi)者拿元素的容器
1.常見阻塞場(chǎng)景
當(dāng)前隊(duì)列中沒有數(shù)據(jù)的情況下,消費(fèi)端的所有線程都會(huì)被自動(dòng)阻塞(掛起),直到有數(shù)據(jù)放入隊(duì)列
當(dāng)隊(duì)列種數(shù)據(jù)填充滿的情況下,生產(chǎn)者端的所有線程都會(huì)被自動(dòng)阻塞(掛起),直到隊(duì)列中
有空的位置,線程被自動(dòng)喚醒
2.BlockingQueue
2.1.放入數(shù)據(jù):
- offer(anobject):表示如果可以將anobject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false(本方法不阻塞當(dāng)前執(zhí)行方法的線程)
- offer(e o,long timeout,TimeUnit unit):可以設(shè)定等待的時(shí)間,如果在指定的時(shí)間還不能往隊(duì)列中加入blockQuene,則返回失敗
- put(anobject):將anobject加到BlockingQueue里,如果BlockQueue沒有控件,則調(diào)用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續(xù)
2.2.獲取數(shù)據(jù):
- poll(long timeout,TimeUnit unit):從BlockQueue中取出一個(gè)隊(duì)首的對(duì)象,如果在指定時(shí)間內(nèi)隊(duì)列一旦有時(shí)間可以取,則立即返回隊(duì)列中的數(shù)據(jù),否則直到時(shí)間超過還沒有數(shù)據(jù)可取,返回失敗
- take():取走BlockingQueue排在位首的對(duì)象,若BlockingQueue為空,則阻塞進(jìn)入等待狀態(tài),直到BlockingQueue有新的數(shù)據(jù)加入
- drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對(duì)象(還可以指定獲取數(shù)據(jù)的個(gè)數(shù))通過該方法,可以提升獲取數(shù)據(jù)的效率,無需分多次分批加鎖或釋放鎖。
二、JAVA中的阻塞隊(duì)列
1.ArrayBlockingQueue:由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
他是用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,并按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序,默認(rèn)情況下不保證線程公平的訪問隊(duì)列,公平的訪問隊(duì)列就是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程當(dāng)隊(duì)列不可用時(shí),可以按照阻塞的先后順序訪問隊(duì)列,即先阻塞的生產(chǎn)者先生產(chǎn),先阻塞的消費(fèi)者線程可以先從隊(duì)列里獲取元素,通常情況下為了保證公平性會(huì)降低吞吐量。
2.LinkedBlockingQueue:由鏈表結(jié)構(gòu)組成的有限阻塞隊(duì)列
他是基于鏈表的阻塞隊(duì)列,同ArrayListBlockingQueue類似,此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序,其內(nèi)部也會(huì)維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列,當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù)并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回,只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到緩存容量的最大值時(shí)(可以指定該值),才會(huì)阻塞生產(chǎn)者線程,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之,對(duì)于消費(fèi)者這段的處理也基于同樣的原理,而LinkedBlockingQueue之所以能夠高效的處理處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用獨(dú)立的鎖來控制數(shù)據(jù)同步。
以上兩個(gè)常用的阻塞隊(duì)列,還有五種不再詳細(xì)介紹。
下面分析ArrayBlockingQueue的源代碼:
private static final long serialVersionUID = -817911632652898426L; final Object[] items;//阻塞隊(duì)列維護(hù)的一個(gè)object類型的數(shù)組 int takeIndex;//隊(duì)首元素 int putIndex;//隊(duì)尾元素 int count;//隊(duì)列中的元素 final ReentrantLock lock;//重入鎖 private final Condition notEmpty;//條件對(duì)象判斷數(shù)組不是滿的 private final Condition notFull;//條件對(duì)象判斷數(shù)組不是空的 transient Itrs itrs; final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } /** * Returns item at index i. */ @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; } /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } //取元素 public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock;//鎖 lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//阻塞線程,等待notFull.signalAll()喚醒 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//阻塞線程,等待notEmpty.await()喚醒 return dequeue(); } finally { lock.unlock(); } }
使用阻塞隊(duì)列就無需考慮同步和線程間通信的問題。
public class VolatikeDemo { private int queueSize=10; private ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(queueSize); public static void main(String args[]){ VolatikeDemo demo=new VolatikeDemo(); Consumer consumer=demo.new Consumer(); Producer producer=demo.new Producer(); consumer.start(); producer.start(); } class Consumer extends Thread{ @Override public void run() { while(true){ try { int res=queue.take(); System.out.println(res); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { while(true){ try { this.sleep(200); queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
三、線程池
在編程中經(jīng)常會(huì)使用線程來異步處理任務(wù),但是每個(gè)線程的創(chuàng)建和銷毀都需要一定的 開銷。如果每次執(zhí)行一個(gè)任務(wù)都需要打開一個(gè)新線程去執(zhí)行,則這些線程的創(chuàng)建和銷毀 將消耗大量的資源,并且線程都是各自為政的,很難對(duì)其進(jìn)行控制,更何況還有一堆的 線程在執(zhí)行,這時(shí)就需要線程池來對(duì)線程進(jìn)行管理,在java 1.5中提供了Executor框架用于 把任務(wù)的提交和執(zhí)行解耦,任務(wù)的提交交給Runnable或者Callable,而Executor框架用來 處理任務(wù),Executor框架中的核心成員就是ThreadPoolExecutor,他是線程池的核心類。
1.ThreadPoolExecutor
可以通過創(chuàng)建ThreadPoolExecutor來創(chuàng)建一個(gè)線程池,ThreadPoolExecutor類一共有四個(gè)構(gòu)造方法
其中擁有最多參數(shù)的構(gòu)造方法:
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(?,?,?,?,?);
- corePoolSize:核心線程數(shù),默認(rèn)情況下線程池是空的,只有任務(wù)提交是才會(huì)線程,如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新線程來處理任務(wù),如果等于或者多余corepoolsize則不會(huì)創(chuàng)建新線程,如果調(diào)用prestartAllcoreThread()方法:線程池會(huì)提前創(chuàng)建并啟動(dòng)所有的核心線程來等待任務(wù)。
- maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù),如果任務(wù)隊(duì)列滿了,并且線程數(shù)小于maximumPoolSize,則線程仍會(huì)創(chuàng)建新線程來處理任務(wù)。
- keepAliveTime:非核心線程閑置的超時(shí)時(shí)間,超過這個(gè)時(shí)間則回收,如果任務(wù)很多,并且每個(gè)任務(wù)的執(zhí)行時(shí)間的時(shí)間很短,則可以調(diào)大keepAliveTime來提高線程的利用率。
- TimeUnit:keepAliveTime參數(shù)的時(shí)間單位,可選的單位有天,小時(shí),分鐘,秒,毫秒等
- workQueue:任務(wù)隊(duì)列,如果當(dāng)前線程數(shù)大與corePoolSize則將任務(wù)添加到此任務(wù)隊(duì)列中,該任務(wù)隊(duì)列是BlockingQuenu類型的,也就是阻塞隊(duì)列
- ThreadFactory:線程工廠,可以線程工廠給每個(gè)創(chuàng)建出來的線程池設(shè)置名字,一般情況下無需要設(shè)置此參數(shù)
- RejectedExecutionHandler:飽和策略,這是當(dāng)任務(wù)隊(duì)列和線程池都滿了時(shí)所采取的應(yīng)對(duì)策略默認(rèn)是無法處理新任務(wù)(AbordPolicy)并拋出RejectedExecutionException異常,此外還有三種策略,分別如下:1.CallersRunsPolicy:用調(diào)查者所在的線程來處理任務(wù),此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度(簡(jiǎn)言之,降低提交速度)
- DiscardPolicy:不能執(zhí)行的任務(wù),并將該任務(wù)刪除
- DiscardOldestPolicy:丟棄隊(duì)列最近的任務(wù),并執(zhí)行當(dāng)前的任務(wù)。
2.線程池的處理流程和原理
2.1.提交任務(wù)后,線程池先判斷線程數(shù)是否達(dá)到了核心線程數(shù)(corepoolSize)
如果還沒有達(dá)到核心線程數(shù),則創(chuàng)建核心線程處理任務(wù),否則執(zhí)行下一步。
2.2.線程池判斷任務(wù)隊(duì)列是否滿了,如果沒滿,將任務(wù)加入任務(wù)隊(duì)列,否則執(zhí)行
下一步。
2.3.線程池判斷線程數(shù)是否達(dá)到最大線程數(shù),如果未達(dá)到,則創(chuàng)建非核心線程處理任務(wù),
否則就執(zhí)行飽和策略,默認(rèn)會(huì)拋出RejectedExecutionExeception異常。
通過線程池的執(zhí)行示意圖我們可以看出,如果我們執(zhí)行ThreadPoolExecutor的execute方法,
會(huì)遇到各種情況
- 如果線程池中的線程數(shù)沒有達(dá)到核心線程數(shù),則創(chuàng)建核心線程執(zhí)行任務(wù)。
- 如果線程池中的線程數(shù)大于或等于核心線程數(shù),則加入任務(wù)隊(duì)列,線程池中的空閑線程會(huì)不斷的從任務(wù)隊(duì)列中取任務(wù)執(zhí)行。
- 如果任務(wù)隊(duì)列滿了,并且線程數(shù)沒有達(dá)到最大線程數(shù),則創(chuàng)建非核心線程去處理任務(wù)。
- 如果線程數(shù)超過了最大線程數(shù),則執(zhí)行飽和策略。
3.四種常用的線程池
3.1.FixedThreadPool:他是可重用固定線程數(shù)的線程池,他的主要特點(diǎn)如下:
- 只有核心線程,沒有非核心線程,并且數(shù)量是固定的,keepAliveTime設(shè)置為0意味著多余的線程會(huì)被立即終止,因此不會(huì)產(chǎn)生多余的線程,采用了無界阻塞隊(duì)列LinkedBlockingQueue。
- 當(dāng)執(zhí)行execute方法時(shí):如果當(dāng)前運(yùn)行的線程數(shù)未達(dá)到核心線程數(shù),則創(chuàng)建一個(gè)新線程來處理任務(wù),如果運(yùn)行線程數(shù)等于核心線程數(shù),則將任務(wù)添加到阻塞隊(duì)列中,F(xiàn)ixThread就是擁有固定數(shù)量核心線程的線程池,并且這些核心線程不會(huì)被回收,當(dāng)線程池中有空閑線程就會(huì)去任務(wù)隊(duì)列取任務(wù)
4.CacheThreadPool
4.1.CacheThreadPool:他是一個(gè)根據(jù)需要?jiǎng)?chuàng)建線程的線程池,他的主要特點(diǎn)如下:
- CacheThreadPool的corePoolSize為0,maximumPoolSize設(shè)置為最大值,他沒有核心線程,非核心線程是無界的,keepAliveTime設(shè)置為60s,并使用了阻塞隊(duì)列SynchronousQueue,他是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須要等待另外一個(gè)線程的移除操作,同樣一個(gè)移除操作也需要等待插入操作。
- 當(dāng)執(zhí)行execute方法時(shí):首先會(huì)執(zhí)行synchroniusQueue的offer方法來提交任務(wù),并且查詢線程池中是否有空閑的線程執(zhí)行SynchronousQueue的poll方法來移除任務(wù),如果有則配對(duì)成功,將任務(wù)交給這個(gè)線程去處理,如果沒有則配對(duì)失敗,創(chuàng)建新的線程去執(zhí)行任務(wù)當(dāng)線程池中的線程空閑時(shí),他會(huì)執(zhí)行SynchronusQueue的poll方法,等待synchronoudQueue提交的新任務(wù),如果60s沒有新任務(wù)提交到synchronousQueue,則這個(gè)線程就會(huì)終止。cacheThreadPool適合大量需要立即處理并且耗時(shí)少的任務(wù)。
- SingleThreadExecutor:他是使用單個(gè)工作線程的線程池,corePoolSize和maximumPoolSize都為1,意味著SingleThreadExecutor只有一個(gè)核心線程,其他核心參數(shù)都和FixThreadPool一樣,SingleThreadExecutor執(zhí)行execute方法時(shí),如果當(dāng)前運(yùn)行的線程數(shù)未達(dá)到核心線程數(shù),也就是當(dāng)前沒有運(yùn)行的線程,則創(chuàng)建一個(gè)新線程來處理任務(wù),如果當(dāng)前有運(yùn)行的線程,則將任務(wù)添加到阻塞隊(duì)列中,因此SingleThreadExecutor能確保所有的任務(wù)都在一個(gè)線程中按照順序逐一執(zhí)行
最后
如果你看到了這里,覺得文章寫得不錯(cuò)就給個(gè)贊唄!歡迎大家評(píng)論討論!如果你覺得那里值得改進(jìn)的,請(qǐng)給我留言。一定會(huì)認(rèn)真查詢,修正不足,定期免費(fèi)分享技術(shù)干貨。喜歡的小伙伴可以關(guān)注一下哦。謝謝!