何為線程安全的類?
一般來說,我們要設計一個線程安全的類,要從三個方面去考慮:
- 構成狀態的所有變量。比如某個域是集合類型,則集合元素也構成該實例的狀態。
- 某些操作所隱含的不變性條件。
- 變量的所有權,或稱它是否會被發布。
基于條件的同步策略
不變性條件取決于類的語義,比如說計數器類的 counter 屬性被設置為 Integer 類型,雖然其閾值在 Integer.MIN_VALUE 到 Integer.MAX_VALUE 之間,但是它的值必須非負。即:隨著計數的進行, conuter >= 0 總是成立。
除了不變性條件之外,一些操作還需要通過后驗條件,以此判斷狀態的更改是否有效。比如一個計數器計到 17 時,它的下一個狀態只可能是 18。這實際涉及到了對原先狀態的 "讀 - 改 - 寫" 三個連續的步驟,典型的如自增 ++ 等。"無記憶性" 的狀態是不需要后驗條件的,比如每隔一段時間測量的溫度值。
先驗條件可能是更加關注的問題,因為 "先判斷后執行" 的邏輯到處存在。比如說對一個列表執行 remove 操作時,首先需要保證列表是非空的,否則就應該拋出異常。
在并發環境下,這些條件均可能會隨著其它線程的修改而出現失真。
狀態發布與所有權
在許多情況下,所有權和封裝性是相互關聯的。比如對象通過 private 關鍵字封裝了它的狀態,即表明實例獨占對該狀態的所有權 ( 所有權意味控制權 )。反之,則稱該狀態被發布。被發布的實例狀態可能會被到處修改,因此它們在多線程環境中也存在風險。
容器類通常和元素表現出 "所有權" 分離的形式。比如說一個聲明為 final 的列表,客戶端雖然無法修改其本身的引用,但可以自由地修改其元素的狀態。這些事實上被發布的元素必須被安全地共享。這要求元素:
- 自身是事實不可變的實例。
- 線程安全的實例。
- 被鎖保護。
實例封閉
大多數對象都是組合對象,或者說這些狀態也是對象。對組合類的線程安全性分析大致分為兩類:
- 如果這些狀態線程不安全,那應該如何安全地使用組合類?
- 即使所有的狀態都線程安全,是否可以推斷組合類也線程安全?或者說組合類是否還需要額外的同步策略?
對于第一個問題,見下方的 Bank 代碼,它模擬了一個轉賬業務:
class Bank {
private Integer amount_A = 100;
private Integer amount_B = 50;
public synchronized void transaction(Integer amount){
var log_0 = amount_A + amount_B;
amount_A += amount;
amount_B -= amount;
var log_1 = amount_A + amount_B;
assert log_0 == log_1;
}
}
復制代碼
雖然 amount_A 和 amount_B 本身作為普通的 Integer 類型并不是線程安全的,但是它們具備線程安全的語義:
private
transaction()
也可以理解成: Bank 是為兩個 Integer 狀態提供線程安全性的容器。在此處,同步策略由 synchronized 內置鎖實現。
編譯器會在 synchronized 的代碼區前后安插 monitorenter 和 monitorexit 字節碼表示進入 / 退出同步代碼塊。JAVA 的內置鎖也稱之監視器鎖,或者監視器。
至于第二個問題,答案是:看情況,具體地說是分析是否存在不變性條件,在這里,它指代在轉賬過程當中,a 和 b 兩個賬戶的余額之和應當不變。如果使用原子類型保護 amount_A 和 amount_B 的狀態,那么是否就可以撤下 transaction() 方法上的內置鎖了?
class UnsafeBank {
private final AtomicInteger amount_A = new AtomicInteger(100);
private final AtomicInteger amount_B = new AtomicInteger(50);
public void transaction(Integer amount){
amount_A.set(amount_A.get() - amount);
amount_B.set(amount_B.get() + amount);
}
}
復制代碼
transaction() 方法現在失去了鎖的保護。這樣,某線程 A 在執行交易的過程中,另一個線程 B 也可能會 "趁機" 修改 amount_B 的賬目 —— 這個時機發生在線程 A 執行 amount_B.get() 之后,但在 amount_B.set() 之前。最終,B 線程的修改將被覆蓋而丟失,在它看來,盡管兩個狀態均是原子變量,但不變性條件仍然被破壞了。
由此得到一個結論 —— 就算所有的可變狀態都是原子的,我們可能仍需要在封裝類的層面進一步考慮同步策略,最簡單直接的就是找出封裝類內的所有復合操作:
- 對同一個變量 ( 反復 ) 讀-改-寫。
- 修改受某個不變性條件約束的多個變量。
正確地拓展同步策略
在大部分情況下,我們不能通過直接修改類源碼的形式補充同步策略。比如,普通的 List<T> 接口不保證底下的各種實現是線程安全的,但我們可以通過類似代理的方式將線程安全委托給第三方。比如:
class ThreadSafeArrayList {
private final List<Integer> list;
public ThreadSafeArrayList(List<Integer> l){list = l;}
// 添加新的方法
public synchronized boolean putIfAbsent(Integer a){
if(list.contains(a)) {
list.add(a);
return true;
}
return false;
}
// 代理 add 方法,其它略
public synchronized boolean add(Integer a) {
return list.add(a);
}
// ...
}
復制代碼
事實上,Java 類庫已經有了對應的線程安全類。通常,我們應當優先重用這些已有的類。在下方的代碼塊中,我們使用
Collection.synchronizedList 工廠方法創建一個線程安全的 list 對象,這樣似乎就只需要為新拓展的 putIfAbsent() 方法加鎖了。
class ThreadUnSafeArrayList {
private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
// 添加新的方法
public synchronized boolean putIfAbsent(Integer a){
if(list.contains(a)) {
list.add(a);
return true;
}
return false;
}
public boolean add(Integer a){return list.add(a);}
//...
}
復制代碼
但是,上述的代碼是錯誤的。為什么?問題在于,我們使用了錯誤的鎖進行了同步。當調用的是 add 方法時,使用的是列表對象的內置鎖;但調用 putIfAbsent 方法時,我們使用的卻是 ThreadUnsafeArrayList 對象的內置鎖。這意味著 putIfAbsent 方法對于其它的方法來說不是原子的,因此無法確保一個線程執行 putIfAbsent 方法時,其它線程是否會通過調用其它方法修改列表。
因此,想要讓這個方法正確執行,我們必須要在正確的地方上鎖。
class ThreadUnSafeArrayList {
private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
public boolean putIfAbsent(Integer a){
synchronized (list){
if(list.contains(a)) {
list.add(a);
return true;
}
return false;
}
}
}
復制代碼
同步容器
同步容器是安全的,但在某些情況下仍然需要客戶端加鎖。常見的操作如:
- 迭代;
- 跳轉 ( 比如,尋找下一個元素 );
- 條件運算,如 "若沒有則 XX 操作" ( 一種常見的復合操作 );
復合操作不受同步容器保護
這里有兩個線程 T1,T2 分別會以不可預測的次序執行兩個代碼塊,它們負責刪除和讀取 list 中的末尾元素。我們在這里使用的是庫中的同步列表,因此可以確保 size() , remove() , get() 方法全部是原子的。但是,當程序以 x1 , y1 , x2 , y2 的操作次序執行時,主程序最終仍然會拋出 IndexOutOfBoundsException 異常。
class DemoOfConcurrentFail {
public final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
{
Collections.addAll(list, 1, 2, 3, 4, 5);
}
public static void main(String[] args) {
var testList = new DemoOfConcurrentFail().list;
Runnable t1 = () -> {
var last = testList.size() - 1; // x1
testList.remove(last); // x2
};
Runnable t2 = () -> {
var last = testList.size() -1; // y1
var r = testList.get(last); // y2
System.out.println(r);
};
new Thread(t1).start();
new Thread(t2).start();
}
}
復制代碼
究其原因,兩個線程 T1,T2 執行的復合操作沒有受鎖保護 ( 實際上就是前文銀行轉賬的例子中犯過的錯誤 )。所以正確的做法是對復合操作整體加鎖。比如:
var mutex = new Object();
Runnable t1 = () -> {
synchronized (mutex){
var last = testList.size() - 1; // x1
testList.remove(last); // x2
}
};
Runnable t2 = () -> {
synchronized (mutex){
var last = testList.size() -1; // y1
var r = testList.get(last); // y2
System.out.println(r);
}
};
// ...
復制代碼
同步容器的迭代問題
在迭代操作中,類似的問題也仍然存在。無論是直接的 for 循環還是 for-each 循環,對容器的遍歷方式是使用 Iterator。而使用迭代器本身也是先判斷 ( hasNext ) 再讀取 ( next ) 的復合過程。Java 對同步容器的迭代處理是:假設某一個線程在迭代的過程中發現容器被修改過了,則立刻失敗 ( 也稱及時失敗 ),并拋出一個
ConcurrentModificationException 異常。
// 可能需要運行多次才能拋出 ConcurrentModificationException
Runnable t1 = () -> {
// 刪除中間的元素
int mid = testList.size() / 2;
testList.remove(mid);
};
Runnable t2 = () -> {
for(var item : testList){
System.out.println(item);
}
};
new Thread(t1).start();
new Thread(t2).start();
復制代碼
類似地,想要不受打擾地迭代容器元素,我們也要在 for 循環的外面加鎖,但是可能并不是一個好的主意。假如容器的規模非常大,或者每個元素的處理時間非常長,那么其它等待容器執行短作業的線程會因此陷入長時間的等待,這會帶來活躍性問題。
一個可行的方法就是實現讀寫分離 —— 一旦有寫操作,則重新拷貝一份新的容器副本,而在此期間所有讀操作則仍在原來的容器中進行,實現 "讀-讀共享"。當讀操作遠多于寫操作時,這種做法無疑可以大幅度地提高程序的吞吐量,見后文的并發容器 CopyOnWriteArrayList 。
警惕隱含迭代的操作
不僅是顯式的 for 循環會觸發迭代。比如容器的 toString 方法在底層調用 StringBuilder.Append() 方法依次將每一個元素的字符串拼接起來。除此之外,包括 equals , containsAll , removeAll , retainAll ,乃至將容器本身作為參數的構造器,都隱含了對容器的迭代過程。這些間接的迭代錯誤都有可能拋出
ConcurrentModificationException 異常。
并發容器
考慮到重量級鎖對性能的影響,Java 后續提供了各種并發容器來改進同步容器的性能問題。同步容器將所有操作完全串行化。當鎖競爭尤其激烈時,程序的吞吐量將大大降低。因此,使用并發容器來替代同步容器,在絕大部分情況下都算是一頓 "免費的午餐"。
ConcurrentHashMap
ConcurrentHashMap 使用了更小的封鎖粒度換取了更大程度的共享,這個封鎖機制稱之為分段鎖 ( Lock Stripping )。簡單點說,就是每一個桶由單獨的鎖來保護,操作不同桶的兩個線程不需要相互等待。好處是,在高并發環境下, ConcurrentHashMap 帶來了更大的吞吐量,但問題是,封鎖粒度的減小削弱了容器的一致性語義,或稱弱一致性 ( Weakly Consistent )。
比如說需要在整個 Map 上計算的 size() 和 isEmpty() 方法,弱一致性會使得這些方法的計算結果是一個過期值。這考慮到是一個權衡,因為在并發環境下,這兩個方法的作用很小,因為其返回值總是不斷變化的。因此,這些操作的需求被弱化了,以換取其它更重要的性能優化,比如 get , put , cotainsKey , remove 等。
因此,除非一部分嚴謹的業務無法容忍弱一致性,否則并發的 HashMap 是要比同步 HashMap 更優的選擇。
CopyOnWriteArrayList
該工具在讀操作遠多于寫操作的場合下能夠提供更好的并發性能,在迭代時不需要對容器進行加鎖或者復制。當發生修改時,該容器會創建并重新發布一個新的容器副本。在新副本創建之前,一切讀操作仍然以舊的容器為準,因此這不會拋出
ConcurrentModificationException 問題。
相對的,如果頻繁調用 add , remove , set 等方法,則該容器的吞吐量會大大降低,因為這些操作需要反復調用系統的 copy 方法復制底層的數組 ( 這也是沒有設計 "CopyOnWriteLinkedList" 的原因,因為拷貝的效率會更低 )。同時,寫入時復制的特性使得 CopyOnWriteArrayList 是弱一致性的。
阻塞隊列 & 生產者 — 消費者模式
阻塞隊列,簡單地說,就是當隊列為空時,執行 take 操作會進入阻塞狀態;當隊列滿時,執行 put 操作也會進入阻塞狀態。阻塞隊列也可以分有界隊列和無界隊列。無界隊列永遠不會充滿,因此執行 put 方法永遠不會進入阻塞狀態。但是,如果生產者的執行效率遠超過消費者,那么無界隊列的無限擴張最終會耗盡內存。有界隊列則可以保證當隊列充滿時,生產者被 put 阻塞,通過這種方式來讓消費者趕上工作進度。
可以用阻塞隊列實現生產者 — 消費者模式,最常見的生產者 — 消費者模式是線程池與工作隊列的組合。這種模式將 "發布任務" 與 "領取任務" 解耦,最大的便捷是簡化了復雜的負載管理,因為生產者和消費者的執行速度并不總是相匹配的。同時,生產者和消費者的角色是相對的。比如處于流水線中游的組件,它們既作為上游的消費者,也作為下游的生產者。
Java 庫已經包含了關于阻塞隊列的多種實現,它自身保證 put 和 take 操作是線程安全的。
- LinkedBlockingQueue 和 ArrayBlockingQueue :此兩者的區別可以參考 Link 和 Array,見: ArrayBlockingQueue 和 LinkedBlockingQueue 。兩者均為 FIFO 的隊列。
- PriorityBlockingQueue :優先級隊列,當我們希望以一定次序處理任務時,它要比 FIFO 隊列更實用。
- SynchronousQueue :譯為同步阻塞隊列。這個隊列事實上沒有緩存空間,而是維護一組可用的線程。當隊列收到消息時,它可以立刻分配一個線程去處理。但是如果沒有多余的工作線程,那么調用 put 或者 take 會立刻陷入阻塞狀態。因此,僅當有足夠多的消費者,并且總是有一個消費者準備好獲取交付的工作時,才適合使用同步隊列。
下方的代碼塊是由 SynchronousQueue 實現的簡易 Demo,每個線程會搶占式消費消息。
var chan = new SynchronousQueue<Integer>();
var worker = new Thread(()->{
while(true){
try {
final var x = chan.take();
System.out.println("t1 consume: " + x);
} catch (InterruptedException e) {e.printStackTrace();}
}
});
var worker2 = new Thread(()->{
while(true){
try {
final var x = chan.take();
System.out.println("t2 consume: " + x);
} catch (InterruptedException e) {e.printStackTrace();}
}
});
worker.start();
worker2.start();
for(var i = 0 ; i < 10; i ++) chan.put(i);
復制代碼
基于所有權的角度去分析,生產者 — 消費者模式和阻塞隊列一起促進了 串行的線程封閉 。線程封閉對象只能由單個對象擁有,但可以通過在執行的最后發布該對象 ( 即表示之后不會再使用它 ),以表示 "轉讓" 所有權。
阻塞隊列簡化了轉移的邏輯。除此之外,還可以通過 ConcurrentMap 的原子方法 remove,或者是 AtomicReference 的 compareAndSet ( 即 CAS 機制 ) 實現安全的串行線程封閉。
雙端隊列和工作竊取
Java 6 之后增加了新的容器類型 —— Deque 和 BlockDeque,它們是對 Queue 以及 BlockingQueue 的拓展。Deque 實現了再隊列頭和隊列尾的高效插入和移除,具體實現包括了 ArrayDeque 和 LinkedBlockingDeque。
雙端隊列適用于另一種工作模式 —— 工作竊取 ( Work Stealing )。比如,一個工作線程已經完成清空了自己的任務隊列,它就可以從其它忙碌的工作線程的任務隊列的尾部獲取隊列。這種模式要比生產者 —— 消費者具備更高的可伸縮性,因為工作線程不會在單個共享的任務隊列上發生競爭。
工作竊取特別適合遞歸的并發問題,即執行一個任務時會產生更多的工作,比如:Web 爬蟲,GC 垃圾回收時的圖搜索算法。
阻塞和中斷方法
線程可能會被阻塞,或者是暫停執行,原因有多種:等待 I/O 結束,等待獲得鎖,等待從 Thread.sleep 中喚醒,等待另一個線程的計算結果。被阻塞的線程必須要在這些 "外因" 被解決之后才有機會繼續執行,即恢復到 RUNNABLE ( 也稱就緒 ) 狀態,等待被再次調度 CPU 執行。
這段描述其實對應了 JVM 線程的兩個狀態:BLOCKING 和 WAITING。
- BLOCKING,當線程準備進入一段新的同步代碼塊時,因不能獲得鎖而等待。
- WAITING,當線程已經進入同步代碼塊之后,在執行的過程中因不滿足某些條件而暫停。這時可以調用 waiting 方法 釋放已占據的鎖 。其它工作線程得以搶占此鎖并執行,直到滿足先驗條件為真時,其它線程可以通過 notifyAll 方法重新令監視此鎖的所有 WAITING 線程再次爭鎖并繼續工作。 wait / notify / notifyAll 構成了線程之間的協商機制,見下面的代碼塊。
static class Status{public boolean v;}
public static void main(String[] args) throws InterruptedException{
var status = new Status();
status.v = false;
var mutex = new Object();
new Thread(()->{
synchronized (mutex){
System.out.println("get mutex");
// 此時檢測的狀態為 false, 進入 WAITING 狀態。
if(!status.v) try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();}
// 被喚醒后重新檢測狀態為 true。
System.out.println(status.v);
}
}).start();
new Thread(()->{
synchronized (mutex){
// 將狀態設置為 true,喚醒上面的線程
status.v = true;
mutex.notify();
}
}).start();
}
復制代碼
只有處于 RUNNABLE 狀態的線程才會實際獲得 CPU 使用權。
Java中哪些操作會使線程釋放鎖資源_后端碼匠的博客-CSDN博客_線程釋放鎖資源
JVM中的線程狀態 - 知乎 (zhihu.com)
在 Java 中,一切會發生阻塞的方法都會被要求處理 InterruptedException 受檢異常。調用阻塞方法的方法也會變成阻塞方法。線程內部有一個 boolean 類型的狀態位表示中斷,調用 interrupt 方法可以將該狀態位標識為 true 。但是這不意味著該線程就會立刻中斷:
InterruptedException
同步工具類
Java 還提供了諸如信號量 ( Semaphore ),柵欄 ( Barrier ),以及閉鎖 ( Latch ) 作為同步工具類,它們都包含了一定的結構性屬性:這些狀態將決定執行同步工具類的線程是執行還是等待。
閉鎖
閉鎖是一種同步工具類,可以延遲線程的進度直到閉鎖打開。在此之前,所有的線程必須等待,而在閉鎖結束之后,這個鎖將永久保持打開狀態。這個特性適用于 需要確保某個任務的前序任務 ( 比如初始化 ) 全部完成之后才可以執行的場合,見下方的代碼:Worker 線程等待另兩個初始化線程準備就緒之后輸出 p 的結果。
// class Point{int x,y;}
final var p = new Point();
final var p_latch = new CountDownLatch(2);
// Worker
new Thread(()->{
try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();}
System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
}).start();
// Init x
new Thread(()->{
p.x = 1;
p_latch.countDown();
}).start();
// Init y
new Thread(()->{
p.y = 2;
p_latch.countDown();
}).start();
復制代碼
FutureTask 也可以拿來做閉鎖,它實現了 Future 的語義,表示一個抽象的可生成結果的計算,一般需要由線程池驅動執行,表示一個異步的任務。
Runnable 接口表示無返回值的計算,Callable<T> 代表有返回值的計算。
final var futurePoint = new FutureTask<>(()->new Point(1,2));
new Thread(futurePoint).start();
new Thread(()->{
try {
// 在 Callable 計算出結果之前阻塞
var p = futurePoint.get();
System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
復制代碼
信號量
計數信號量用于控制某個資源的同時訪問數量,通常用于配置有容量限制的資源池,或稱有界阻塞容器。Semaphore 管理一組許可,線程在需要時首先獲取許可,并在操作結束之后歸還許可。如果許可數量被耗盡,那么線程則必須要阻塞到其它任意線程歸還許可 ( 默認情況下遵循 Non-Fair 策略 ) 為止。特別地,當信號量的許可數為 1 時,則可認為是不可重入的互斥鎖。
下面是一個利用信號量 + 同步容器實現的簡易阻塞隊列:
class BoundedBlockingQueue<E>{
final private List<E> list = Collections.synchronizedList(new LinkedList<>());
final private Semaphore se;
public BoundedBlockingQueue(int cap){
se = new Semaphore(cap);
}
public void enqueue(E e) throws InterruptedException {
se.acquire();
list.add(0,e);
}
public E dequeue(){
final var done = list.remove(0);
se.release();
return done;
}
@Override
public String toString() {
return "BoundedBlockingQueue{" +
"list=" + list +
'}';
}
}
復制代碼
柵欄
柵欄 ( Barrier ) 類似于閉鎖,同樣都會阻塞到某一個事件發生。閉鎖強調等待某個事件發生之后再執行動作,而柵欄更強調在某個事件發生之前等待其它線程。它可用于實現一些協議:"所有人在指定的時間去會議室碰頭,等到所有的人到齊之后再開會",比如數據庫事務的兩階段提交。
Java 提供了一個名為 CyclicBarrier 的柵欄,它指定了 N 個工作線程 反復地 在柵欄位置匯集。在某線程執行完畢之后,調用 await() 方法阻塞自身,以等待其它更慢的線程到達柵欄位置。當設定的 N 個線程均調用 await() 之后,柵欄將打開,此時所有的線程將可以繼續向下執行代碼,而柵欄本身的狀態會重置,以便復用 ( 因而命名為 Cyclic- )。
見下面的代碼,4 個線程并行執行初始化工作 ( 以隨機時間的 sleep 模擬延遲 ),并等待所有線程初始化完畢之后同時打印信息。
final int N = 4;
final var barrier = new CyclicBarrier(N);
final Thread[] workers = new Thread[N];
for(var i : new Integer[]{0,1,2,3}){
var t = new Thread(()->{
try {
// 模擬隨機的延時
var rdm = new Random().nextInt(1000);
Thread.sleep(rdm);
// 在所有其它線程到達之前阻塞
barrier.await();
// 所有線程到達之后執行,每個線程打印延時時間
System.out.printf("prepare for %d millisn",rdm);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
workers[i] = t;
t.start();
}
// 等待所有的任務并行執行完畢。
for(var worker : workers){worker.join();}
復制代碼
在不涉及 IO 操作和數據共享的計算問題當中,線程數量為 N CPU 或者 N CPU + 1 時會獲得最優的吞吐量,更多的線程也不會帶來帶來幫助,甚至性能還會下降,因為 CPU 需要頻繁的切換上下文。
一旦線程成功地到達柵欄,則 await() 方法會其標記為 "子線程"。 CyclicBarrier 的構造器還接受額外的 Runnable 接口做回調函數,當所有線程全部到達柵欄之后, CyclicBarrier 會從子線程當中挑選出一個領導線程去執行它 ( 即,每一輪通過柵欄之后,它都會被執行且僅一次 ),我們可以在此實現日志記錄等操作。
final var barrier = new CyclicBarrier(N,()->{
System.out.println("all runners ready");
});
復制代碼
在并行任務中構建高效的緩存
為了用簡單的例子說明問題,我們在這里特別強調并行 ( Parallel ) 任務,這些任務的計算過程是純粹 ( Pure ) 的 —— 這樣的函數被稱之純函數。無論它們何時被調用,被哪個線程調用,同樣的輸入永遠得到同樣的輸出。純函數不和外部環境交互,因此自然也就不存在競態條件。
一個非常自然的想法是使用緩存 ( 或稱記憶機制 Memorized ) 避免重復的運算。在純粹的映射關系中,固定的輸入總是對應固定的輸出,因此使用 K-V 鍵值對來記憶結果再好不過了。我們基于 HashMap 給出最簡單的一版實現,然后再探討如何改進它們。
class MapCacheV1 {
private final HashMap<Integer,String> cache = new HashMap<>();
public synchronized String getResult(Integer id){
var v = cache.get(id);
if (v == null){
// 設定中,這個靜態方法具有 500ms 左右的延遲。
v = PURE.slowOperation(id);
cache.put(id,v);
}
return v;
}
}
復制代碼
盡管我們打算將 MapCache 用于無競態條件的并行任務,但 getResult() 方法仍然加上了同步鎖,因為 HashMap 本身不是線程安全的, cache 需要以安全的方式被并發訪問。然而,這種做法無疑會使得 getResult() 方法變得十分笨重,因為原本可以并行的慢操作 PURE.slowOperation() 也被鎖在了代碼塊內部。
最先想到的是使用更加高效的 ConcurrentHashMap 類取代線程不安全的 HashMap ,以獲得免費的多線程性能提升:
class MapCacheV2 {
private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>();
public String getResult(Integer id){
var v = cache.get(id);
if(v == null){
v = PURE.slowOperation(id);
cache.put(id,v);
}
return v;
}
}
復制代碼
同時,我們這一次取消掉了 getResult() 上的同步鎖。這樣,多線程可以并行地執行慢操作,只在修改 cache 時發生競爭。但這個緩存仍有一些不足 —— 當某個線程 A 在計算新值時 ( 即這 500ms 之內 ),其它線程并不知道。因此,多個線程有可能會計算同一個新值,甚至導致其它的計算任務無法進行。
針對這個問題,我們再一次提出改進。不妨讓 cache 保存 "計算過程",而非值。這樣,工作線程將有三種行為:
- 緩存中沒有此計算任務,注冊并執行。
- 緩存中有此計算任務,但未完畢,當前線程阻塞 ( 將 CPU 讓給其它需要計算的線程 )。
- 緩存中有此計算任務,且已計算完畢,直接返回。
回顧前文在閉鎖中提到的 FutureTask<V> 類型,它適合用于當前的實現,見下方的代碼:
class MapCacheV3 {
private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>();
public String getResult(Integer id) throws ExecutionException, InterruptedException {
// 獲取一個計算任務,而非值
final var task = cache.get(id);
if(task == null){
final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));
// cache.putIfAbsent()
cache.put(id,newTask);
newTask.run();
// 提交并執行任務。
return newTask.get();
}else return task.get();
}
}
復制代碼
MapCacheV3 的實現已經近乎完美了。唯一不足的是:我們對 cache 的操作仍然是 "先判斷后執行" 的復合操作,但現在 getResult 并沒有同步鎖的保護。兩個線程仍然同時調用 cache.get() 并判空,并開始執行重復的計算。
下面的版本給出了最終的解決方案:使用 ConcurrentMap 的 putIfAbsent() 原子方法修復可能重復添加計算任務的問題。
public String getResult(Integer id) throws ExecutionException, InterruptedException {
// 獲取一個計算任務,而非值
final var task = cache.get(id);
if(task == null){
final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));
// put 和 putIfAbsent 方法均會返回此 Key 對應的上一個舊值 Value。
// 如果 put 的是一個新的 Key,則返回值為 null。
final var maybeNull = cache.putIfAbsent(id,newTask);
if(maybeNull == null) newTask.run();
return newTask.get();
}else return task.get();
}
復制代碼
值得注意的是,一旦 cache 存儲的是計算任務而非值,那么就可能存在緩存污染的問題。一旦某個 FutureTask 的計算被取消,或者失敗,應當及時將它從緩存中移除以保證將來的計算成功,而不是放任其駐留在緩存內部返回失敗的結果。
緩存思想幾乎應用在各個地方。比如在 Web 服務中,用戶的數據往往不會總是直接來自數據庫,而是 redis 這樣的消息中間件。在實際的應用環境下,還有更加復雜的問題需要被考慮到,比如緩存內容過時 ( expired ),或者是定期清理緩存空間等。
原文鏈接:
https://juejin.cn/post/7138285297208393741?utm_source=tuicool&utm_medium=referral