背景
“生產者和消費者模型” 是多線程通信的典型案例,本章節將利用前一節的鎖和條件隊列的知識,來實現一個完整的有界緩沖區,并創建多個線程訪問該有界緩沖區,模擬生產者提供數據、消費者處理數據的過程,正文如下。
生產者消費者模型
生產者和消費者模型中,因為多個線程共享同一個緩沖區,所以就涉及到兩個重要的通信約束:
- 緩沖區滿的時候,生產者不能再添加數據,應該阻塞等待,直到緩沖區有空位;
- 緩沖區空的時候,消費者不能再獲取數據,應該阻塞等待,直到有新的數據加入緩沖區。
要保證上述約束條件,可以用 sleep 空循環,也可以使用鎖和條件隊列。利用鎖和條件隊列實現的思路是,生產者和消費者有各自要等待的條件,一旦條件不滿足,就阻塞在該條件隊列上,直到另一個線程喚醒自己。
實現過程
緩沖區的 “滿” 和 “空” 是兩個條件,如果用內置鎖,對緩沖區的操作由同一把鎖保護,只能共用一個條件隊列;如果使用顯式鎖,則可以定義兩個條件隊列。
這里我們就用內置鎖和內置條件隊列來實現一個通信模型中的共享緩沖區類。設計類圖結構:
抽象有界限緩存
首先,創建一抽象有界緩存類 ABoundedBuffer,提供插入和刪除的基本實現。
/**
* @title :ABoundedBuffer
* @description :有界緩存抽象類
* @update :2019-12-20 上午9:29:33
* @author :172.17.5.73
* @version :1.0.0
* @since :2019-12-20
*/
public abstract class ABoundedBuffer<V> {
private final V[] buf;
private int tail;
private int head;
private int count;
protected ABoundedBuffer(int capacity){
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v){
buf[tail] = v;
if(++tail==buf.length){
tail = 0;
}
++count;
}
protected synchronized final V doTake(){
V v = buf[head];
buf[head] = null;
if(++head==buf.length){
head = 0;
}
--count;
return v;
}
public synchronized final boolean isFull(){
return count == buf.length;
}
public synchronized final boolean isEmpty(){
return count==0;
}
}
定義實現類
其次,利用內置條件隊列,編寫子類實現可阻塞的插入和刪除操作。
插入操作,依賴的條件是緩存非滿,當條件不滿足時,調用 wait 方法掛起線程,一旦插入成功,說明緩存非空,則調用 notifyAll 方法喚醒等待非空的線程。
刪除操作,依賴的條件是非空,當條件不滿足時,同樣掛起等待,一旦刪除成功,說明緩存非滿,喚起等待該條件的線程。
完整的源碼為:
import JAVA.util.Date;
/**
*
* @title :InnerConditionQueue
* @description :使用內置條件隊列,實現簡單的有界緩存
* 通過對象的 wait 和 notify 來實現掛起
* 鎖對象是 this,調用 wait/notify 的對象是同一個對象。
* 三元關系(鎖、wait/notify、條件謂詞)
* 缺陷:
* 線程從 wait 中被喚醒時,并不代碼條件謂詞為真,此時還是需要再判斷條件。所以必須在循環中調用wait
* 每次醒來時都判斷謂詞的真假。
* 謂詞:對客體的描述或說明(是什么、怎么樣、做什么),描述客體的本質、關系、特性等的詞項。
* @update :2019-12-20 下午4:18:06
* @author :172.17.5.73
* @version :1.0.0
* @since :2019-12-20
*/
public class InnerConditionQueue<V> extends ABoundedBuffer<V> {
protected InnerConditionQueue(int capacity) {
super(capacity);
}
public synchronized void put(V v) throws InterruptedException{
while(isFull()){
System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());
wait();
}
doPut(v);
notifyAll();
}
public synchronized V take() throws InterruptedException{
while(isEmpty()){
System.out.println(new Date()+" buffer is empty thread wait:"+Thread.currentThread().getName());
wait();
}
V v = doTake();
//每當在等待一個條件時,一定要確保在條件謂詞變為真時,通過某種方式發出通知
notifyAll();
System.out.println(new Date()+" "+Thread.currentThread().getName()+" take:"+v);
return v;
}
}
測試類
最后,編寫測試代碼,創建一個大小為 2 的緩沖區對象,同時啟動三個線程執行插入操作,主線程執行四次消費操作。測試代碼如下:
import java.util.Date;
public class Main {
public static void main(String[] args) {
final InnerConditionQueue<String> bu = new InnerConditionQueue<String>(2);
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
try {
bu.put("hello1");
} catch (InterruptedException execption) {
System.out.println("intercetp1:"+Thread.currentThread().getName());
}
}
});
Thread t2 = new Thread(new Runnable(){
@Override
public void run() {
try {
bu.put("hello2");
} catch (InterruptedException execption) {
System.out.println("intercetp2:"+Thread.currentThread().getName());
}
}
});
Thread t3 = new Thread(new Runnable(){
@Override
public void run() {
try {
bu.put("hello3");
Thread.sleep(50000);
bu.put("last one...");
} catch (InterruptedException execption) {
System.out.println("intercetp3:"+Thread.currentThread().getName());
}
}
});
t1.start();
t2.start();
t3.start();
try {
Thread.sleep(5000);
bu.take();
bu.take();
bu.take();
bu.take();
} catch (InterruptedException execption) {
execption.printStackTrace();
}
System.out.println(new Date()+" main over...");
}
}
測試結果
執行結果:t3 的第一個 put 操作會因為緩存已滿而阻塞,5 秒后主線程刪除兩個操作后,重新被喚醒。主線程的第四個 bu.take() 操作會因為緩存為空而阻塞,直到 t3 在 50 秒后重新插入"last one" 后被喚醒,操作結束。
Tue Dec 20 10:23:53 CST 2019 buffer is Full thread wait:Thread-2
Tue Dec 20 10:23:58 CST 2019 main take:hello1
Tue Dec 20 10:23:58 CST 2019 main take:hello2
Tue Dec 20 10:23:58 CST 2019 buffer is empty thread wait:main
Tue Dec 20 10:23:58 CST 2019 main take:hello3
Tue Dec 20 10:23:58 CST 2019 buffer is empty thread wait:main
Tue Dec 20 10:24:48 CST 2019 main take:last one...
Tue Dec 20 10:24:48 CST 2019 main over...
啟示錄
我們的例子中,“非空” 和 “非滿” 這兩種條件關聯著同一個條件隊列,當一個線程由于其他線程調用了notifyAll 而被喚醒時,并不意味著它等待的條件已經為真了,這也是內置條件隊列的局限所在。
所以代碼中的加固措施是,使用循環判斷條件是否發生,如果發生,則調用 wait 阻塞自己,等待其他線程喚醒:
while(isFull()){
System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());
wait();
}
同樣的功能,Java 并發包中的 ArrayBlockingQueue 是使用 ReentrantLock 和 ObjectCondition 實現的可阻塞隊列,為什么 JDK 使用顯式鎖和顯式條件隊列呢?
使用內置鎖的局限性在于一把鎖只有一個條件隊列,而這里涉及到兩種等待條件,所以使用 ReentrantLock 更合適,它可以關聯多個條件隊列,這樣就可以巧妙地處理多條件的阻塞和喚醒了!