日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

 

何為線程安全的類?

一般來說,我們要設計一個線程安全的類,要從三個方面去考慮:

  1. 構成狀態的所有變量。比如某個域是集合類型,則集合元素也構成該實例的狀態。
  2. 某些操作所隱含的不變性條件。
  3. 變量的所有權,或稱它是否會被發布。

基于條件的同步策略

不變性條件取決于類的語義,比如說計數器類的 counter 屬性被設置為 Integer 類型,雖然其閾值在 Integer.MIN_VALUE 到 Integer.MAX_VALUE 之間,但是它的值必須非負。即:隨著計數的進行, conuter >= 0 總是成立。

除了不變性條件之外,一些操作還需要通過后驗條件,以此判斷狀態的更改是否有效。比如一個計數器計到 17 時,它的下一個狀態只可能是 18。這實際涉及到了對原先狀態的 "讀 - 改 - 寫" 三個連續的步驟,典型的如自增 ++ 等。"無記憶性" 的狀態是不需要后驗條件的,比如每隔一段時間測量的溫度值。

先驗條件可能是更加關注的問題,因為 "先判斷后執行" 的邏輯到處存在。比如說對一個列表執行 remove 操作時,首先需要保證列表是非空的,否則就應該拋出異常。

在并發環境下,這些條件均可能會隨著其它線程的修改而出現失真。

狀態發布與所有權

在許多情況下,所有權和封裝性是相互關聯的。比如對象通過 private 關鍵字封裝了它的狀態,即表明實例獨占對該狀態的所有權 ( 所有權意味控制權 )。反之,則稱該狀態被發布。被發布的實例狀態可能會被到處修改,因此它們在多線程環境中也存在風險。

容器類通常和元素表現出 "所有權" 分離的形式。比如說一個聲明為 final 的列表,客戶端雖然無法修改其本身的引用,但可以自由地修改其元素的狀態。這些事實上被發布的元素必須被安全地共享。這要求元素:

  1. 自身是事實不可變的實例。
  2. 線程安全的實例。
  3. 被鎖保護。

實例封閉

大多數對象都是組合對象,或者說這些狀態也是對象。對組合類的線程安全性分析大致分為兩類:

  1. 如果這些狀態線程不安全,那應該如何安全地使用組合類?
  2. 即使所有的狀態都線程安全,是否可以推斷組合類也線程安全?或者說組合類是否還需要額外的同步策略?

對于第一個問題,見下方的 Bank 代碼,它模擬了一個轉賬業務:

class Bank {
    private Integer amount_A = 100;
    private Integer amount_B = 50;
    public synchronized void transaction(Integer amount){
        var log_0 = amount_A + amount_B;
        amount_A += amount;
        amount_B -= amount;
        var log_1 = amount_A + amount_B;
        assert log_0 == log_1;
    }
}
復制代碼

雖然 amount_A 和 amount_B 本身作為普通的 Integer 類型并不是線程安全的,但是它們具備線程安全的語義:

private
transaction()

也可以理解成: Bank 是為兩個 Integer 狀態提供線程安全性的容器。在此處,同步策略由 synchronized 內置鎖實現。

編譯器會在 synchronized 的代碼區前后安插 monitorenter 和 monitorexit 字節碼表示進入 / 退出同步代碼塊。JAVA 的內置鎖也稱之監視器鎖,或者監視器。

至于第二個問題,答案是:看情況,具體地說是分析是否存在不變性條件,在這里,它指代在轉賬過程當中,a 和 b 兩個賬戶的余額之和應當不變。如果使用原子類型保護 amount_A 和 amount_B 的狀態,那么是否就可以撤下 transaction() 方法上的內置鎖了?

class UnsafeBank {
    private final AtomicInteger amount_A = new AtomicInteger(100);
    private final AtomicInteger amount_B = new AtomicInteger(50);
    public void transaction(Integer amount){
        amount_A.set(amount_A.get() - amount);
        amount_B.set(amount_B.get() + amount);
    }
}
復制代碼

transaction() 方法現在失去了鎖的保護。這樣,某線程 A 在執行交易的過程中,另一個線程 B 也可能會 "趁機" 修改 amount_B 的賬目 —— 這個時機發生在線程 A 執行 amount_B.get() 之后,但在 amount_B.set() 之前。最終,B 線程的修改將被覆蓋而丟失,在它看來,盡管兩個狀態均是原子變量,但不變性條件仍然被破壞了。

由此得到一個結論 —— 就算所有的可變狀態都是原子的,我們可能仍需要在封裝類的層面進一步考慮同步策略,最簡單直接的就是找出封裝類內的所有復合操作:

  1. 對同一個變量 ( 反復 ) 讀-改-寫。
  2. 修改受某個不變性條件約束的多個變量。

正確地拓展同步策略

在大部分情況下,我們不能通過直接修改類源碼的形式補充同步策略。比如,普通的 List<T> 接口不保證底下的各種實現是線程安全的,但我們可以通過類似代理的方式將線程安全委托給第三方。比如:

class ThreadSafeArrayList {
    private final List<Integer> list;
    public ThreadSafeArrayList(List<Integer> l){list =  l;}
    
    // 添加新的方法
    public synchronized boolean putIfAbsent(Integer a){
        if(list.contains(a)) {
            list.add(a);
            return true;
        }
        return false;
    }

    // 代理 add 方法,其它略
    public synchronized boolean add(Integer a) {
        return list.add(a);
    }

    // ...
}
復制代碼

事實上,Java 類庫已經有了對應的線程安全類。通常,我們應當優先重用這些已有的類。在下方的代碼塊中,我們使用
Collection.synchronizedList 工廠方法創建一個線程安全的 list 對象,這樣似乎就只需要為新拓展的 putIfAbsent() 方法加鎖了。

class ThreadUnSafeArrayList {
    private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());

    // 添加新的方法
    public synchronized boolean putIfAbsent(Integer a){
        if(list.contains(a)) {
            list.add(a);
            return true;
        }
        return false;
    }
    
    public boolean add(Integer a){return list.add(a);}
    //...
}
復制代碼

但是,上述的代碼是錯誤的。為什么?問題在于,我們使用了錯誤的鎖進行了同步。當調用的是 add 方法時,使用的是列表對象的內置鎖;但調用 putIfAbsent 方法時,我們使用的卻是 ThreadUnsafeArrayList 對象的內置鎖。這意味著 putIfAbsent 方法對于其它的方法來說不是原子的,因此無法確保一個線程執行 putIfAbsent 方法時,其它線程是否會通過調用其它方法修改列表。

因此,想要讓這個方法正確執行,我們必須要在正確的地方上鎖。

class ThreadUnSafeArrayList {
    private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
    public boolean putIfAbsent(Integer a){
        synchronized (list){
            if(list.contains(a)) {
                list.add(a);
                return true;
            }
            return false;
        }
    }
}
復制代碼

同步容器

同步容器是安全的,但在某些情況下仍然需要客戶端加鎖。常見的操作如:

  1. 迭代;
  2. 跳轉 ( 比如,尋找下一個元素 );
  3. 條件運算,如 "若沒有則 XX 操作" ( 一種常見的復合操作 );

復合操作不受同步容器保護

這里有兩個線程 T1,T2 分別會以不可預測的次序執行兩個代碼塊,它們負責刪除和讀取 list 中的末尾元素。我們在這里使用的是庫中的同步列表,因此可以確保 size() , remove() , get() 方法全部是原子的。但是,當程序以 x1 , y1 , x2 , y2 的操作次序執行時,主程序最終仍然會拋出 IndexOutOfBoundsException 異常。

class DemoOfConcurrentFail {

    public final List<Integer> list = Collections.synchronizedList(new ArrayList<>());

    {
        Collections.addAll(list, 1, 2, 3, 4, 5);
    }

    public static void main(String[] args) {
        var testList = new DemoOfConcurrentFail().list;

        Runnable t1 = () -> {
            var last = testList.size() - 1;  // x1
            testList.remove(last);  // x2
        };

        Runnable t2 = () -> {
            var last = testList.size() -1;  // y1
            var  r = testList.get(last);  // y2
            System.out.println(r);
        };

        new Thread(t1).start();
        new Thread(t2).start();

    }
}
復制代碼

究其原因,兩個線程 T1,T2 執行的復合操作沒有受鎖保護 ( 實際上就是前文銀行轉賬的例子中犯過的錯誤 )。所以正確的做法是對復合操作整體加鎖。比如:

var mutex = new Object();

Runnable t1 = () -> {
    synchronized (mutex){
        var last = testList.size() - 1;  // x1
        testList.remove(last);  // x2
    }
};

Runnable t2 = () -> {
    synchronized (mutex){
        var last = testList.size() -1;  // y1
        var  r = testList.get(last);  // y2
        System.out.println(r);
    }
};

// ...
復制代碼

同步容器的迭代問題

在迭代操作中,類似的問題也仍然存在。無論是直接的 for 循環還是 for-each 循環,對容器的遍歷方式是使用 Iterator。而使用迭代器本身也是先判斷 ( hasNext ) 再讀取 ( next ) 的復合過程。Java 對同步容器的迭代處理是:假設某一個線程在迭代的過程中發現容器被修改過了,則立刻失敗 ( 也稱及時失敗 ),并拋出一個
ConcurrentModificationException 異常。

// 可能需要運行多次才能拋出 ConcurrentModificationException
Runnable t1 = () -> {
    // 刪除中間的元素
    int mid =  testList.size() / 2;
    testList.remove(mid);
};

Runnable t2 = () -> {
    for(var item : testList){
        System.out.println(item);
    }
};

new Thread(t1).start();
new Thread(t2).start();
復制代碼

類似地,想要不受打擾地迭代容器元素,我們也要在 for 循環的外面加鎖,但是可能并不是一個好的主意。假如容器的規模非常大,或者每個元素的處理時間非常長,那么其它等待容器執行短作業的線程會因此陷入長時間的等待,這會帶來活躍性問題。

一個可行的方法就是實現讀寫分離 —— 一旦有寫操作,則重新拷貝一份新的容器副本,而在此期間所有讀操作則仍在原來的容器中進行,實現 "讀-讀共享"。當讀操作遠多于寫操作時,這種做法無疑可以大幅度地提高程序的吞吐量,見后文的并發容器 CopyOnWriteArrayList 。

警惕隱含迭代的操作

不僅是顯式的 for 循環會觸發迭代。比如容器的 toString 方法在底層調用 StringBuilder.Append() 方法依次將每一個元素的字符串拼接起來。除此之外,包括 equals , containsAll , removeAll , retainAll ,乃至將容器本身作為參數的構造器,都隱含了對容器的迭代過程。這些間接的迭代錯誤都有可能拋出
ConcurrentModificationException 異常。

并發容器

考慮到重量級鎖對性能的影響,Java 后續提供了各種并發容器來改進同步容器的性能問題。同步容器將所有操作完全串行化。當鎖競爭尤其激烈時,程序的吞吐量將大大降低。因此,使用并發容器來替代同步容器,在絕大部分情況下都算是一頓 "免費的午餐"。

ConcurrentHashMap

ConcurrentHashMap 使用了更小的封鎖粒度換取了更大程度的共享,這個封鎖機制稱之為分段鎖 ( Lock Stripping )。簡單點說,就是每一個桶由單獨的鎖來保護,操作不同桶的兩個線程不需要相互等待。好處是,在高并發環境下, ConcurrentHashMap 帶來了更大的吞吐量,但問題是,封鎖粒度的減小削弱了容器的一致性語義,或稱弱一致性 ( Weakly Consistent )。

比如說需要在整個 Map 上計算的 size() 和 isEmpty() 方法,弱一致性會使得這些方法的計算結果是一個過期值。這考慮到是一個權衡,因為在并發環境下,這兩個方法的作用很小,因為其返回值總是不斷變化的。因此,這些操作的需求被弱化了,以換取其它更重要的性能優化,比如 get , put , cotainsKey , remove 等。

因此,除非一部分嚴謹的業務無法容忍弱一致性,否則并發的 HashMap 是要比同步 HashMap 更優的選擇。

CopyOnWriteArrayList

該工具在讀操作遠多于寫操作的場合下能夠提供更好的并發性能,在迭代時不需要對容器進行加鎖或者復制。當發生修改時,該容器會創建并重新發布一個新的容器副本。在新副本創建之前,一切讀操作仍然以舊的容器為準,因此這不會拋出
ConcurrentModificationException 問題。

相對的,如果頻繁調用 add , remove , set 等方法,則該容器的吞吐量會大大降低,因為這些操作需要反復調用系統的 copy 方法復制底層的數組 ( 這也是沒有設計 "CopyOnWriteLinkedList" 的原因,因為拷貝的效率會更低 )。同時,寫入時復制的特性使得 CopyOnWriteArrayList 是弱一致性的。

阻塞隊列 & 生產者 — 消費者模式

阻塞隊列,簡單地說,就是當隊列為空時,執行 take 操作會進入阻塞狀態;當隊列滿時,執行 put 操作也會進入阻塞狀態。阻塞隊列也可以分有界隊列和無界隊列。無界隊列永遠不會充滿,因此執行 put 方法永遠不會進入阻塞狀態。但是,如果生產者的執行效率遠超過消費者,那么無界隊列的無限擴張最終會耗盡內存。有界隊列則可以保證當隊列充滿時,生產者被 put 阻塞,通過這種方式來讓消費者趕上工作進度。

可以用阻塞隊列實現生產者 — 消費者模式,最常見的生產者 — 消費者模式是線程池與工作隊列的組合。這種模式將 "發布任務" 與 "領取任務" 解耦,最大的便捷是簡化了復雜的負載管理,因為生產者和消費者的執行速度并不總是相匹配的。同時,生產者和消費者的角色是相對的。比如處于流水線中游的組件,它們既作為上游的消費者,也作為下游的生產者。

Java 庫已經包含了關于阻塞隊列的多種實現,它自身保證 put 和 take 操作是線程安全的。

  1. LinkedBlockingQueue 和 ArrayBlockingQueue :此兩者的區別可以參考 Link 和 Array,見: ArrayBlockingQueue 和 LinkedBlockingQueue 。兩者均為 FIFO 的隊列。
  2. PriorityBlockingQueue :優先級隊列,當我們希望以一定次序處理任務時,它要比 FIFO 隊列更實用。
  3. SynchronousQueue :譯為同步阻塞隊列。這個隊列事實上沒有緩存空間,而是維護一組可用的線程。當隊列收到消息時,它可以立刻分配一個線程去處理。但是如果沒有多余的工作線程,那么調用 put 或者 take 會立刻陷入阻塞狀態。因此,僅當有足夠多的消費者,并且總是有一個消費者準備好獲取交付的工作時,才適合使用同步隊列。

下方的代碼塊是由 SynchronousQueue 實現的簡易 Demo,每個線程會搶占式消費消息。

var chan = new SynchronousQueue<Integer>();

var worker = new Thread(()->{
    while(true){
        try {
            final var x = chan.take();
            System.out.println("t1 consume: " + x);
        } catch (InterruptedException e) {e.printStackTrace();}
    }
});

var worker2 = new Thread(()->{
    while(true){
        try {
            final var x = chan.take();
            System.out.println("t2 consume: " + x);
        } catch (InterruptedException e) {e.printStackTrace();}
    }
});

worker.start();
worker2.start();

for(var i = 0 ; i < 10; i ++) chan.put(i);
復制代碼

基于所有權的角度去分析,生產者 — 消費者模式和阻塞隊列一起促進了 串行的線程封閉 。線程封閉對象只能由單個對象擁有,但可以通過在執行的最后發布該對象 ( 即表示之后不會再使用它 ),以表示 "轉讓" 所有權。

阻塞隊列簡化了轉移的邏輯。除此之外,還可以通過 ConcurrentMap 的原子方法 remove,或者是 AtomicReference 的 compareAndSet ( 即 CAS 機制 ) 實現安全的串行線程封閉。

雙端隊列和工作竊取

Java 6 之后增加了新的容器類型 —— Deque 和 BlockDeque,它們是對 Queue 以及 BlockingQueue 的拓展。Deque 實現了再隊列頭和隊列尾的高效插入和移除,具體實現包括了 ArrayDeque 和 LinkedBlockingDeque。

雙端隊列適用于另一種工作模式 —— 工作竊取 ( Work Stealing )。比如,一個工作線程已經完成清空了自己的任務隊列,它就可以從其它忙碌的工作線程的任務隊列的尾部獲取隊列。這種模式要比生產者 —— 消費者具備更高的可伸縮性,因為工作線程不會在單個共享的任務隊列上發生競爭。

工作竊取特別適合遞歸的并發問題,即執行一個任務時會產生更多的工作,比如:Web 爬蟲,GC 垃圾回收時的圖搜索算法。

阻塞和中斷方法

線程可能會被阻塞,或者是暫停執行,原因有多種:等待 I/O 結束,等待獲得鎖,等待從 Thread.sleep 中喚醒,等待另一個線程的計算結果。被阻塞的線程必須要在這些 "外因" 被解決之后才有機會繼續執行,即恢復到 RUNNABLE ( 也稱就緒 ) 狀態,等待被再次調度 CPU 執行。

這段描述其實對應了 JVM 線程的兩個狀態:BLOCKING 和 WAITING。

  1. BLOCKING,當線程準備進入一段新的同步代碼塊時,因不能獲得鎖而等待。
  2. WAITING,當線程已經進入同步代碼塊之后,在執行的過程中因不滿足某些條件而暫停。這時可以調用 waiting 方法 釋放已占據的鎖 。其它工作線程得以搶占此鎖并執行,直到滿足先驗條件為真時,其它線程可以通過 notifyAll 方法重新令監視此鎖的所有 WAITING 線程再次爭鎖并繼續工作。 wait / notify / notifyAll 構成了線程之間的協商機制,見下面的代碼塊。
static class Status{public boolean v;}
public static void main(String[] args) throws InterruptedException{

    var status = new Status();
    status.v = false;

    var mutex = new Object();

    new Thread(()->{
        synchronized (mutex){
            System.out.println("get mutex");
            // 此時檢測的狀態為 false, 進入 WAITING 狀態。
            if(!status.v) try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();}
            // 被喚醒后重新檢測狀態為 true。
            System.out.println(status.v);
        }
    }).start();

    new Thread(()->{
        synchronized (mutex){
            // 將狀態設置為 true,喚醒上面的線程
            status.v = true;
            mutex.notify(); 
        }
    }).start();


}
復制代碼

只有處于 RUNNABLE 狀態的線程才會實際獲得 CPU 使用權。

Java中哪些操作會使線程釋放鎖資源_后端碼匠的博客-CSDN博客_線程釋放鎖資源

JVM中的線程狀態 - 知乎 (zhihu.com)

在 Java 中,一切會發生阻塞的方法都會被要求處理 InterruptedException 受檢異常。調用阻塞方法的方法也會變成阻塞方法。線程內部有一個 boolean 類型的狀態位表示中斷,調用 interrupt 方法可以將該狀態位標識為 true 。但是這不意味著該線程就會立刻中斷:

InterruptedException

同步工具類

Java 還提供了諸如信號量 ( Semaphore ),柵欄 ( Barrier ),以及閉鎖 ( Latch ) 作為同步工具類,它們都包含了一定的結構性屬性:這些狀態將決定執行同步工具類的線程是執行還是等待。

閉鎖

閉鎖是一種同步工具類,可以延遲線程的進度直到閉鎖打開。在此之前,所有的線程必須等待,而在閉鎖結束之后,這個鎖將永久保持打開狀態。這個特性適用于 需要確保某個任務的前序任務 ( 比如初始化 ) 全部完成之后才可以執行的場合,見下方的代碼:Worker 線程等待另兩個初始化線程準備就緒之后輸出 p 的結果。

// class Point{int x,y;}
final var p = new Point();
final var p_latch = new CountDownLatch(2);

// Worker
new Thread(()->{
    try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();}
    System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
}).start();

// Init x
new Thread(()->{
    p.x = 1;
    p_latch.countDown();
}).start();

// Init y
new Thread(()->{
    p.y = 2;
    p_latch.countDown();
}).start();
復制代碼

FutureTask 也可以拿來做閉鎖,它實現了 Future 的語義,表示一個抽象的可生成結果的計算,一般需要由線程池驅動執行,表示一個異步的任務。

Runnable 接口表示無返回值的計算,Callable<T> 代表有返回值的計算。

final var futurePoint = new FutureTask<>(()->new Point(1,2));

new Thread(futurePoint).start();
new Thread(()->{
    try {
        // 在 Callable 計算出結果之前阻塞
        var p = futurePoint.get();
        System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).start();
復制代碼

信號量

計數信號量用于控制某個資源的同時訪問數量,通常用于配置有容量限制的資源池,或稱有界阻塞容器。Semaphore 管理一組許可,線程在需要時首先獲取許可,并在操作結束之后歸還許可。如果許可數量被耗盡,那么線程則必須要阻塞到其它任意線程歸還許可 ( 默認情況下遵循 Non-Fair 策略 ) 為止。特別地,當信號量的許可數為 1 時,則可認為是不可重入的互斥鎖。

下面是一個利用信號量 + 同步容器實現的簡易阻塞隊列:

class BoundedBlockingQueue<E>{
    final private List<E> list = Collections.synchronizedList(new LinkedList<>());
    final private Semaphore se;

    public BoundedBlockingQueue(int cap){
        se = new Semaphore(cap);
    }

    public void enqueue(E e) throws InterruptedException {
        se.acquire();
        list.add(0,e);
    }

    public E dequeue(){
        final var done = list.remove(0);
        se.release();
        return done;
    }

    @Override
    public String toString() {
        return "BoundedBlockingQueue{" +
                "list=" + list +
                '}';
    }
}
復制代碼

柵欄

柵欄 ( Barrier ) 類似于閉鎖,同樣都會阻塞到某一個事件發生。閉鎖強調等待某個事件發生之后再執行動作,而柵欄更強調在某個事件發生之前等待其它線程。它可用于實現一些協議:"所有人在指定的時間去會議室碰頭,等到所有的人到齊之后再開會",比如數據庫事務的兩階段提交。

Java 提供了一個名為 CyclicBarrier 的柵欄,它指定了 N 個工作線程 反復地 在柵欄位置匯集。在某線程執行完畢之后,調用 await() 方法阻塞自身,以等待其它更慢的線程到達柵欄位置。當設定的 N 個線程均調用 await() 之后,柵欄將打開,此時所有的線程將可以繼續向下執行代碼,而柵欄本身的狀態會重置,以便復用 ( 因而命名為 Cyclic- )。

見下面的代碼,4 個線程并行執行初始化工作 ( 以隨機時間的 sleep 模擬延遲 ),并等待所有線程初始化完畢之后同時打印信息。

final int N = 4;
final var barrier =  new CyclicBarrier(N);
final Thread[] workers = new Thread[N];

for(var i : new Integer[]{0,1,2,3}){
    var t = new Thread(()->{
        try {
            // 模擬隨機的延時
            var rdm = new Random().nextInt(1000);
            Thread.sleep(rdm);

            // 在所有其它線程到達之前阻塞
            barrier.await();

            // 所有線程到達之后執行,每個線程打印延時時間
            System.out.printf("prepare for %d millisn",rdm);
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    workers[i] = t;
    t.start();
}
// 等待所有的任務并行執行完畢。
for(var worker : workers){worker.join();}
復制代碼

在不涉及 IO 操作和數據共享的計算問題當中,線程數量為 N CPU 或者 N CPU + 1 時會獲得最優的吞吐量,更多的線程也不會帶來帶來幫助,甚至性能還會下降,因為 CPU 需要頻繁的切換上下文。

一旦線程成功地到達柵欄,則 await() 方法會其標記為 "子線程"。 CyclicBarrier 的構造器還接受額外的 Runnable 接口做回調函數,當所有線程全部到達柵欄之后, CyclicBarrier 會從子線程當中挑選出一個領導線程去執行它 ( 即,每一輪通過柵欄之后,它都會被執行且僅一次 ),我們可以在此實現日志記錄等操作。

final var barrier =  new CyclicBarrier(N,()->{
    System.out.println("all runners ready");
});
復制代碼

在并行任務中構建高效的緩存

為了用簡單的例子說明問題,我們在這里特別強調并行 ( Parallel ) 任務,這些任務的計算過程是純粹 ( Pure ) 的 —— 這樣的函數被稱之純函數。無論它們何時被調用,被哪個線程調用,同樣的輸入永遠得到同樣的輸出。純函數不和外部環境交互,因此自然也就不存在競態條件。

一個非常自然的想法是使用緩存 ( 或稱記憶機制 Memorized ) 避免重復的運算。在純粹的映射關系中,固定的輸入總是對應固定的輸出,因此使用 K-V 鍵值對來記憶結果再好不過了。我們基于 HashMap 給出最簡單的一版實現,然后再探討如何改進它們。

class MapCacheV1 {
    private final HashMap<Integer,String> cache = new HashMap<>();
    public synchronized String getResult(Integer id){
        var v = cache.get(id);
        if (v == null){
            // 設定中,這個靜態方法具有 500ms 左右的延遲。
            v = PURE.slowOperation(id);
            cache.put(id,v);
        }
        return v;
    }
}
復制代碼

盡管我們打算將 MapCache 用于無競態條件的并行任務,但 getResult() 方法仍然加上了同步鎖,因為 HashMap 本身不是線程安全的, cache 需要以安全的方式被并發訪問。然而,這種做法無疑會使得 getResult() 方法變得十分笨重,因為原本可以并行的慢操作 PURE.slowOperation() 也被鎖在了代碼塊內部。

最先想到的是使用更加高效的 ConcurrentHashMap 類取代線程不安全的 HashMap ,以獲得免費的多線程性能提升:

class MapCacheV2 {
    private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>();
    public String getResult(Integer id){
        var v = cache.get(id);
        if(v == null){
            v = PURE.slowOperation(id);
            cache.put(id,v);
        }
        return v;
    }
}
復制代碼

同時,我們這一次取消掉了 getResult() 上的同步鎖。這樣,多線程可以并行地執行慢操作,只在修改 cache 時發生競爭。但這個緩存仍有一些不足 —— 當某個線程 A 在計算新值時 ( 即這 500ms 之內 ),其它線程并不知道。因此,多個線程有可能會計算同一個新值,甚至導致其它的計算任務無法進行。

針對這個問題,我們再一次提出改進。不妨讓 cache 保存 "計算過程",而非值。這樣,工作線程將有三種行為:

  1. 緩存中沒有此計算任務,注冊并執行。
  2. 緩存中有此計算任務,但未完畢,當前線程阻塞 ( 將 CPU 讓給其它需要計算的線程 )。
  3. 緩存中有此計算任務,且已計算完畢,直接返回。

回顧前文在閉鎖中提到的 FutureTask<V> 類型,它適合用于當前的實現,見下方的代碼:

class MapCacheV3 {

    private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>();
    public String getResult(Integer id) throws ExecutionException, InterruptedException {
        // 獲取一個計算任務,而非值
        final var task = cache.get(id);
        if(task == null){
            final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));
            // cache.putIfAbsent()
            cache.put(id,newTask);
            newTask.run();
            // 提交并執行任務。
            return newTask.get();
        }else return task.get();
    }
}
復制代碼

MapCacheV3 的實現已經近乎完美了。唯一不足的是:我們對 cache 的操作仍然是 "先判斷后執行" 的復合操作,但現在 getResult 并沒有同步鎖的保護。兩個線程仍然同時調用 cache.get() 并判空,并開始執行重復的計算。

下面的版本給出了最終的解決方案:使用 ConcurrentMap 的 putIfAbsent() 原子方法修復可能重復添加計算任務的問題。

public String getResult(Integer id) throws ExecutionException, InterruptedException {
        // 獲取一個計算任務,而非值
        final var task = cache.get(id);
        if(task == null){
            final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));
            // put 和 putIfAbsent 方法均會返回此 Key 對應的上一個舊值 Value。
            // 如果 put 的是一個新的 Key,則返回值為 null。
            final var maybeNull = cache.putIfAbsent(id,newTask);
            if(maybeNull == null) newTask.run();
            return newTask.get();
        }else return task.get();
    }
復制代碼

值得注意的是,一旦 cache 存儲的是計算任務而非值,那么就可能存在緩存污染的問題。一旦某個 FutureTask 的計算被取消,或者失敗,應當及時將它從緩存中移除以保證將來的計算成功,而不是放任其駐留在緩存內部返回失敗的結果。

緩存思想幾乎應用在各個地方。比如在 Web 服務中,用戶的數據往往不會總是直接來自數據庫,而是 redis 這樣的消息中間件。在實際的應用環境下,還有更加復雜的問題需要被考慮到,比如緩存內容過時 ( expired ),或者是定期清理緩存空間等。

原文鏈接:
https://juejin.cn/post/7138285297208393741?utm_source=tuicool&utm_medium=referral

分享到:
標簽:Java
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定