- 在之前的線程池的介紹中我們看到了很多阻塞隊(duì)列,這篇文章我們主要來說說阻塞隊(duì)列的事。
- 阻塞隊(duì)列也就是
BlockingQueue
,這個(gè)類是一個(gè)接 - 口,同時(shí)繼承了
Queue
接口,這兩個(gè)接口都是在JDK5
中加入的 。 BlockingQueue
阻塞隊(duì)列是線程安全的,在我們業(yè)務(wù)中是會(huì)經(jīng)常頻繁使用到的,如典型的生產(chǎn)者消費(fèi)的場景,生產(chǎn)者只需要向隊(duì)列中添加,而消費(fèi)者負(fù)責(zé)從隊(duì)列中獲取。
- 如上圖展示,我們生產(chǎn)者線程不斷的
put
元素到隊(duì)列,而消費(fèi)者從中take
出元素處理,這樣實(shí)現(xiàn)了任務(wù)與執(zhí)行任務(wù)類之間的解耦,任務(wù)都被放入到了阻塞隊(duì)列中,這樣生產(chǎn)者和消費(fèi)者之間就不會(huì)直接相互訪問實(shí)現(xiàn)了隔離提高了安全性。
并發(fā)隊(duì)列
- 上面是
JAVA
中隊(duì)列Queue
類的類圖,我們可以看到它分為兩大類,阻塞隊(duì)列與非阻塞隊(duì)列 - 阻塞隊(duì)列的實(shí)現(xiàn)接口是
BlockingQueue
而非阻塞隊(duì)列的接口是ConcurrentLinkedQueue
, 本文主要介紹阻塞隊(duì)列,非阻塞隊(duì)列不再過多闡述 BlockingQueue
主要有下面六個(gè)實(shí)現(xiàn)類,分別是ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
、DelayQueue
、PriorityBlockingQueue
、LinkedTransferQueue
。這些阻塞隊(duì)列有著各自的特點(diǎn)和適用場景,后面詳細(xì)介紹。- 非阻塞隊(duì)列的典型例子如
ConcurrentLinkedQueue
, 它不會(huì)阻塞線程,而是利用了CAS
來保證線程的安全。 - 其實(shí)還有一個(gè)隊(duì)列和
Queue
關(guān)系很緊密,那就是Deque
,這其實(shí)是double-ended-queue
的縮寫,意思是雙端隊(duì)列。它的特點(diǎn)是從頭部和尾部都能添加和刪除元素,而我們常見的普通隊(duì)列Queue
則是只能一端進(jìn)一端出,即FIFO
。
阻塞隊(duì)列特點(diǎn)
- 阻塞隊(duì)列的特點(diǎn)就在于阻塞,它可以阻塞線程,讓生產(chǎn)者消費(fèi)者得以平衡,阻塞隊(duì)列中有兩個(gè)關(guān)鍵方法
Put
和Take
方法
take方法
take
方法的功能是獲取并移除隊(duì)列的頭結(jié)點(diǎn),通常在隊(duì)列里有數(shù)據(jù)的時(shí)候是可以正常移除的。可是一旦執(zhí)行take
方法的時(shí)候,隊(duì)列里無數(shù)據(jù),則阻塞,直到隊(duì)列里有數(shù)據(jù)。一旦隊(duì)列里有數(shù)據(jù)了,就會(huì)立刻解除阻塞狀態(tài),并且取到數(shù)據(jù)。過程如圖所示:
put方法
put
方法插入元素時(shí),如果隊(duì)列沒有滿,那就和普通的插入一樣是正常的插入,但是如果隊(duì)列已滿,那么就無法繼續(xù)插入,則阻塞,直到隊(duì)列里有了空閑空間。如果后續(xù)隊(duì)列有了空閑空間,比如消費(fèi)者消費(fèi)了一個(gè)元素,那么此時(shí)隊(duì)列就會(huì)解除阻塞狀態(tài),并把需要添加的數(shù)據(jù)添加到隊(duì)列中。過程如圖所示:
是否有界(容量有多大)
- 此外,阻塞隊(duì)列還有一個(gè)非常重要的屬性,那就是容量的大小,分為有界和無界兩種。
- 無界隊(duì)列意味著里面可以容納非常多的元素,例如
LinkedBlockingQueue
的上限是Integer.MAX_VALUE
,約為 2 的 31 次方,是非常大的一個(gè)數(shù),可以近似認(rèn)為是無限容量,因?yàn)槲覀儙缀鯚o法把這個(gè)容量裝滿。 - 但是有的阻塞隊(duì)列是有界的,例如
ArrayBlockingQueue
如果容量滿了,也不會(huì)擴(kuò)容,所以一旦滿了就無法再往里放數(shù)據(jù)了。
阻塞隊(duì)列常見方法
- 首先我們從常用的方法出發(fā),根據(jù)各自的特點(diǎn)我們可以大致分為三個(gè)大類,如下表所示:
分類 | 方法 | 含義 | 特點(diǎn) |
---|---|---|---|
拋出異常 | add | 添加一個(gè)元素 | 如果隊(duì)列已滿,添加則拋出 IllegalStateException 異常 |
remove | 刪除隊(duì)列頭節(jié)點(diǎn) | 當(dāng)隊(duì)列為空后,刪除則拋出 NoSuchElementException 異常 |
|
element | 獲取隊(duì)列頭元素 | 當(dāng)隊(duì)列為空時(shí),則拋出 NoSuchElementException 異常 |
|
返回?zé)o異常 | offer | 添加一個(gè)元素 | 當(dāng)隊(duì)列已滿,不會(huì)報(bào)異常,返回 false ,如果成功返回 true |
poll | 獲取隊(duì)列頭節(jié)點(diǎn),并且刪除它 | 當(dāng)隊(duì)列空時(shí),返回 Null |
|
peek | 單純獲取頭節(jié)點(diǎn) | 當(dāng)隊(duì)列為空時(shí)反饋 NULL |
|
阻塞 | put | 添加一個(gè)元素 | 如果隊(duì)列已滿則阻塞 |
take | 返回并刪除頭元素 | 如果隊(duì)列為空則阻塞 |
- 如上面所示主要的八個(gè)方法,相對(duì)都比較簡單,下面我們通過實(shí)際代碼演示的方式來認(rèn)識(shí)
拋異常類型[add、remove、element]
add
- 向隊(duì)列中添加一個(gè)元素。如果隊(duì)列是有界隊(duì)列,當(dāng)隊(duì)列已滿時(shí)再添加則拋出異常提示,如下:
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add(1);
queue.add(2);
queue.add(3);
- 上述代碼中我們創(chuàng)建了一個(gè)阻塞隊(duì)列容量為2,當(dāng)我們使用
add
向其中添加元素,當(dāng)添加到第三個(gè)時(shí)則會(huì)拋出異常如下:
remove
remove
方法是從隊(duì)列中刪除隊(duì)列的頭節(jié)點(diǎn),同時(shí)會(huì)返回該元素。當(dāng)隊(duì)列中為空時(shí)執(zhí)行remove
方法時(shí)則會(huì)拋出異常,代碼如下:
private static void groupRemove() {
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add("i-code.online");
System.out.println(queue.remove());
System.out.println(queue.remove());
}
- 上述代碼中,我們可以看到,我們想隊(duì)列中添加了一個(gè)元素
i-code.online
, 之后通過remove
方法進(jìn)行刪除,當(dāng)執(zhí)行第二次remove
時(shí)隊(duì)列內(nèi)已無元素,則拋出異常。如下:
element
element
方法是獲取隊(duì)列的頭元素,但是并不是刪除該元素,這也是與remove
的區(qū)別,當(dāng)隊(duì)列中沒有元素后我們?cè)賵?zhí)行element
方法時(shí)則會(huì)拋出異常,代碼如下:
private static void groupElement() {
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add("i-code.online");
System.out.println(queue.element());
System.out.println(queue.element());
}
private static void groupElement2() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.element());
}
- 上面兩個(gè)方法分別演示了在有元素和無元素的情況
element
的使用。在第一個(gè)方法中并不會(huì)報(bào)錯(cuò),因?yàn)槭自匾恢贝嬖诘模诙€(gè)方法中因?yàn)榭盏模話伋霎惓#缦陆Y(jié)果:
無異常類型[offer、poll、peek]
offer
offer
方法是向隊(duì)列中添加元素, 同時(shí)反饋成功與失敗,如果失敗則返回false
,當(dāng)隊(duì)列已滿時(shí)繼續(xù)添加則會(huì)失敗,代碼如下:
private static void groupOffer() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer("i-code.online"));
System.out.println(queue.offer("云棲簡碼"));
System.out.println(queue.offer("AnonyStar"));
}
- 如上述代碼所示,我們向一個(gè)容量為2的隊(duì)列中通過
offer
添加元素,當(dāng)添加第三個(gè)時(shí),則會(huì)反饋false
,如下結(jié)果:
true
true
false
poll
poll
方法對(duì)應(yīng)上面remove
方法,兩者的區(qū)別就在于是否會(huì)在無元素情況下拋出異常,poll
方法在無元素時(shí)不會(huì)拋出異常而是返回null
,如下代碼:
private static void groupPoll() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer("云棲簡碼")); //添加元素
System.out.println(queue.poll()); //取出頭元素并且刪除
System.out.println(queue.poll());
}
- 上面代碼中我們創(chuàng)建一個(gè)容量為2的隊(duì)列,并添加一個(gè)元素,之后調(diào)用兩次
poll
方法來獲取并刪除頭節(jié)點(diǎn),發(fā)現(xiàn)第二次調(diào)用時(shí)為null
,因?yàn)殛?duì)列中已經(jīng)為空了,如下:
true
云棲簡碼
null
peek
peek
方法與前面的element
方法是對(duì)應(yīng)的 ,獲取元素頭節(jié)點(diǎn)但不刪除,與其不同的在于peek
方法在空隊(duì)列下并不會(huì)拋出異常,而是返回null
,如下:
private static void groupPeek() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer(1));
System.out.println(queue.peek());
System.out.println(queue.peek());
}
private static void groupPeek2() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.peek());
}
- 如上述代碼所示,我么們分別展示了非空隊(duì)列與空隊(duì)列下
peek
的使用,結(jié)果如下:
阻塞類型[put、take]
put
put
方法是向隊(duì)列中添加一個(gè)元素,這個(gè)方法是阻塞的,也就是說當(dāng)隊(duì)列已經(jīng)滿的情況下,再put
元素時(shí)則會(huì)阻塞,直到隊(duì)列中有空位.
take
take
方法是從隊(duì)列中獲取頭節(jié)點(diǎn)并且將其移除,這也是一個(gè)阻塞方法,當(dāng)隊(duì)列中已經(jīng)沒有元素時(shí),take
方法則會(huì)進(jìn)入阻塞狀態(tài),直到隊(duì)列中有新的元素進(jìn)入。
常見的阻塞隊(duì)列
ArrayBlockingQueue
ArrayBlockingQueue
是一個(gè)我們常用的典型的有界隊(duì)列,其內(nèi)部的實(shí)現(xiàn)是基于數(shù)組來實(shí)現(xiàn)的,我們?cè)趧?chuàng)建時(shí)需要指定其長度,它的線程安全性由ReentrantLock
來實(shí)現(xiàn)的。
public ArrayBlockingQueue(int capacity) {...}
public ArrayBlockingQueue(int capacity, boolean fair) {...}
- 如上所示,
ArrayBlockingQueue
提供的構(gòu)造函數(shù)中,我們需要指定隊(duì)列的長度,同時(shí)我們也可以設(shè)置隊(duì)列是都是公平的,當(dāng)我們?cè)O(shè)置了容量后就不能再修改了,符合數(shù)組的特性,此隊(duì)列按照先進(jìn)先出(FIFO
)的原則對(duì)元素進(jìn)行排序。 - 和
ReentrantLock
一樣,如果ArrayBlockingQueue
被設(shè)置為非公平的,那么就存在插隊(duì)的可能;如果設(shè)置為公平的,那么等待了最長時(shí)間的線程會(huì)被優(yōu)先處理,其他線程不允許插隊(duì),不過這樣的公平策略同時(shí)會(huì)帶來一定的性能損耗,因?yàn)榉枪降耐掏铝客ǔ?huì)高于公平的情況。
LinkedBlockingQueue
- 從它的名字我們可以知道,它是一個(gè)由鏈表實(shí)現(xiàn)的隊(duì)列,這個(gè)隊(duì)列的長度是
Integer.MAX_VALUE
,這個(gè)值是非常大的,幾乎無法達(dá)到,對(duì)此我們可以認(rèn)為這個(gè)隊(duì)列基本屬于一個(gè)無界隊(duì)列(也又認(rèn)為是有界隊(duì)列)。此隊(duì)列按照先進(jìn)先出的順序進(jìn)行排序。
SynchronousQueue
synchronousQueue
是一個(gè)不存儲(chǔ)任何元素的阻塞隊(duì)列,每一個(gè)put
操作必須等待take
操作,否則不能添加元素。同時(shí)它也支持公平鎖和非公平鎖。synchronousQueue
的容量并不是1,而是0。因?yàn)樗旧聿粫?huì)持有任何元素,它是直接傳遞的,synchronousQueue
會(huì)把元素從生產(chǎn)者直接傳遞給消費(fèi)者,在這個(gè)過程中能夠是不需要存儲(chǔ)的- 在我們之前介紹過的線程池
CachedThreadPool
就是利用了該隊(duì)列。Executors.newCachedThreadPool()
,因?yàn)檫@個(gè)線程池它的最大線程數(shù)是Integer.MAX_VALUE
,它是更具需求來創(chuàng)建線程,所有的線程都是臨時(shí)線程,使用完后空閑60秒則被回收,
PriorityBlockingQueue
PriorityBlockingQueue
是一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列,可以通過自定義實(shí)現(xiàn)compareTo()
方法來指定元素的排序規(guī)則,或者通過構(gòu)造器參數(shù)Comparator
來指定排序規(guī)則。但是需要注意插入隊(duì)列的對(duì)象必須是可比較大小的,也就是Comparable
的,否則會(huì)拋出ClassCastException
異常。- 它的
take
方法在隊(duì)列為空的時(shí)候會(huì)阻塞,但是正因?yàn)樗菬o界隊(duì)列,而且會(huì)自動(dòng)擴(kuò)容,所以它的隊(duì)列永遠(yuǎn)不會(huì)滿,所以它的put
方法永遠(yuǎn)不會(huì)阻塞,添加操作始終都會(huì)成功
DelayQueue
DelayQueue
是一個(gè)實(shí)現(xiàn)PriorityBlockingQueue
的延遲獲取的無界隊(duì)列。具有“延遲”的功能。DelayQueue
應(yīng)用場景:1. 緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue
保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue
,一旦能從DelayQueue
中獲取元素時(shí),表示緩存有效期到了。2. 定時(shí)任務(wù)調(diào)度。使用DelayQueue
保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue
中獲取到任務(wù)就開始執(zhí)行,從比如TimerQueue
就是使用DelayQueue
實(shí)現(xiàn)的。- 它是無界隊(duì)列,放入的元素必須實(shí)現(xiàn)
Delayed
接口,而Delayed
接口又繼承了Comparable
接口,所以自然就擁有了比較和排序的能力,代碼如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
- 可以看出
Delayed
接口繼承Comparable
,里面有一個(gè)需要實(shí)現(xiàn)的方法,就是getDelay
。這里的getDelay
方法返回的是“還剩下多長的延遲時(shí)間才會(huì)被執(zhí)行”,如果返回 0 或者負(fù)數(shù)則代表任務(wù)已過期。 - 元素會(huì)根據(jù)延遲時(shí)間的長短被放到隊(duì)列的不同位置,越靠近隊(duì)列頭代表越早過期。
本文由AnonyStar 發(fā)布,可轉(zhuǎn)載但需聲明原文出處。
歡迎關(guān)注微信公賬號(hào) :云棲簡碼 獲取更多優(yōu)質(zhì)文章
更多文章關(guān)注筆者博客 :云棲簡碼 i-code.online