JAVA 的BlockingQueue接口, java.util.concurrent.BlockingQueue, 代表著一個隊列可以安全的插入和取元素.換句話說,多線程通過BlockingQueue安全的插入或者取元素,不會有任何并發問題發生。
BlockingQueue用法
BlockingQueue典型的應用是一個生產者多個消費者,下面一張圖描述了其原理:
一個線程放入元素到BlockingQueue,其他線程從中取元素
生產者不斷的生產新對象并且加到隊列BlockingQueue中,直到隊列加滿,也就是說如果隊列滿了,那么生產者就會一直阻塞,直到消費者取出元素。
消費者不斷的從BlockingQueue 中取元素,如果隊列為空,那么一直阻塞直到生產者往隊列中加入元素。
BlockingQueue的方法
BlockingQueue接口中分別有4種方法不同的插入和移除元素的方法,已經兩種檢查元素的方法。如果請求的操作無法執行,則每組方法的行為不同。
下面的表格是4種方法:
下面是4種方法的解釋:
Throws Exception:
嘗試操作失敗,則會拋出異常
Special Value:
嘗試操作,成功會返回true或者失敗返回false
Blocks:
如果不能立馬操作,則會阻塞,直到可以操作
Times Out:
嘗試操作,如果不能立馬操作,則會在指定的時間內返回成功true或者失敗false.
不能在BlockingQueue中加入null,否則會拋NullPointerException。
可以訪問BlockingQueue中的所有元素,而不僅僅是頭和尾。例如,假設您已將一個對象排隊等待處理,但您的應用程序決定取消它。然后可以調用remove(o)來刪除隊列中的特定對象。但是,這并不是很有效,所以除非您真的需要,否則不應該使用這些 Collection方法.
BlockingQueue的實現:
BlockingQueue是個接口,所以需要用它的實現類, java.util.concurrent 包下實現了BlockingQueue的類:
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
這些實現類我們后面會一一講解。
BlockingQueue的簡單例子:
這兒用了 BlockingQueue 的實現類ArrayBlockingQueue .
首先在BlockingQueueExample 分別啟動生產者和消費者線程,生產者往BlockingQueue 中插入String元素,消費者從中取出。
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
這是生產者類,注意每次put()之前都休眠了1s,這回使得消費者阻塞,等待往隊列里面加入元素:
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
這是消費者類,就是從隊列中取元素打印:.
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}