日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

Version:1.0 Starthtml:000000206 EndHTML:000175716 StartFragment:000004583 EndFragment:000175611 StartSelection:000004679 EndSelection:000175592 SourceURL:https://zhuanlan.zhihu.com/p/210471606/edit

現如今,服務器性能日益增長,并發(concurrency)編程已經“深入人心”,但由于馮諾依式計算機“指令存儲,順序執行”的特性,使得編寫跨越時間維度的并發程序異常困難,所以現代編程語言都對并發編程提供了一定程度的支持,像 Golang 里面的 Goroutines、Clojure 里面的 STM(Software Transactional Memory)、Erlang 里面的 Actor。

JAVA 對于并發編程的解決方案是多線程(Multi-threaded programming),而且 Java 中的線程 與 native 線程一一對應,多線程也是早期操作系統支持并發的方案之一(其他方案:多進程、IO多路復用)。

本文著重介紹 Java 中線程同步的原理、實現機制,更側重操作系統層面,部分原理參考 openjdk 源碼。閱讀本文需要對 CyclicBarrier、CountDownLatch 有基本的使用經驗。

JUC

在 Java 1.5 版本中,引入 JUC 并發編程輔助包,很大程度上降低了并發編程的門檻,JUC 里面主要包括:

  • 線程調度的 Executors
  • 緩沖任務的 Queues
  • 超時相關的 TimeUnit
  • 并發集合(如 ConcurrentHashMap)
  • 線程同步類(Synchronizers,如 CountDownLatch )

個人認為其中最重要也是最核心的是線程同步這一塊,因為并發編程的難點就在于如何保證「共享區域(專業術語:臨界區,Critical Section)的訪問時序問題」。

AbstractQueuedSynchronizer

JUC 提供的同步類主要有如下幾種:

  • Semaphore is a classic concurrency tool.
  • CountDownLatch is a very simple yet very common utility for blocking until a given number of signals, events, or conditions hold.
  • A CyclicBarrier is a resettable multiway synchronization point useful in some styles of parallel programming.
  • A Phaser provides a more flexible form of barrier that may be used to control phased computation among multiple threads.
  • An Exchanger allows two threads to exchange objects at a rendezvous(約會) point, and is useful in several pipeline designs.

通過閱讀其源碼可以發現,其實現都基于 AbstractQueuedSynchronizer 這個抽象類(一般簡寫 AQS),正如其 javadoc 開頭所說:

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.

也就是說,AQS 通過維護內部的 FIFO 隊列和具備原子更新的整型 state 這兩個屬性來實現各種鎖機制,包括:是否公平,是否可重入,是否共享,是否可中斷(interrupt),并在這基礎上,提供了更方便實用的同步類,也就是一開始提及的 Latch、Barrier 等。

這里暫時不去介紹 AQS 實現細節與如何基于 AQS 實現各種同步類(挖個坑),感興趣的可以移步美團的一篇文章《不可不說的Java“鎖”事》 第六部分“獨享鎖 VS 共享鎖”。

在學習 Java 線程同步這一塊時,對我來說困擾最大的是「線程喚醒」,試想一個已經 wait/sleep/block 的線程,是如何響應 interrupt 的呢?當調用 Object.wait() 或 lock.lock() 時,JVM 究竟做了什么事情能夠在調用 Object.notify 或 lock.unlock 時重新激活相應線程?

帶著上面的問題,我們從源碼中尋找答案。

Java 如何實現堵塞、通知

wait/notify

public final native void wait(long timeout) throws 
InterruptedException;public final native void notify();

在 JDK 源碼中,上述兩個方法均用 native 實現(即 cpp 代碼),追蹤相關代碼

// java.base/share/native/libjava/Object.cstatic
 JNINativeMethod methods[] = {    {"hashCode",    "()I",                   
 (void *)&JVM_IHashCode},    {"wait",        "(J)V",               
    (void *)&JVM_MonitorWait},    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},  
  {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},  
  {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

通過上面的 cpp 代碼,我們大概能猜出 JVM 是使用 monitor 來實現的 wait/notify 機制,至于這里的 monitor 是何種機制,這里暫時跳過,接著看 lock 相關實現

lock/unlock

LockSupport 是用來實現堵塞語義模型的基礎輔助類,主要有兩個方法:park 與 unpark。(在英文中,park 除了“公園”含義外,還有“停車”的意思)

// LockSupport.java    
public static void unpark(Thread thread) {        if (thread != null) 
           UNSAFE.unpark(thread);    }    public static void park(Object blocker) {        Thread t = Thread.currentThread();        setBlocker(t, blocker);        UNSAFE.park(false, 0L);        setBlocker(t, null);    }// Unsafe.java    /**     * Unblocks the given thread blocked on {@code park}, or, if it is  
   * not blocked, causes the subsequent call to {@code park} not to   
  * block.  Note: this operation is "unsafe" solely because the  
   * caller must somehow ensure that the thread has not been   
  * destroyed. Nothing special is usually required to ensure this     * when called from Java (in which there will ordinarily be a live  
   * reference to the thread) but this is not nearly-automatically   
  * so when calling from native code.   
  *     * @param thread the thread to unpark.     */    @HotSpotIntrinsicCandidate    public native void unpark(Object thread);
    /**   
  * Blocks current thread, returning when a balancing   
  * {@code unpark} occurs, or a balancing {@code unpark} has 
    * already occurred, or the thread is interrupted, or, if not   
  * absolute and time is not zero, the given time nanoseconds have    
 * elapsed, or if absolute, the given deadline in milliseconds    
 * since Epoch has passed, or spuriously (i.e., returning for no   
  * "reason"). Note: This operation is in the Unsafe class only    
 * because {@code unpark} is, so it would be strange to place it     * elsewhere.     */    
@HotSpotIntrinsicCandidate    public native void park(boolean isAbsolute, long time);
// hotspot/share/prims/unsafe.cppUNSAFE_ENTRY(void, Unsafe_Park(JNIEnv
 *env, jobject unsafe, jboolean isAbsolute, jlong time)) {  
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time); 
 EventThreadPark event;  JavaThreadParkedState jtps(thread, time != 0); 
 thread->parker()->park(isAbsolute != 0, time); 
 if (event.should_commit()) {  
  post_thread_park_event(&event, thread->current_park_blocker(), time); 
 }  HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());} UNSAFE_END

通過上述 unsafe.cpp 可以看到每個 thread 都會有一個 Parker 對象,所以我們需要查看 parker 對象的定義

// hotspot/share/runtime/park.hppclass Parker : 
public os::PlatformParker...public: 
 // For simplicity of interface with Java, all forms of park (indefinite, 
 // relative, and absolute) 
are multiplexed into one call.  void park(bool isAbsolute, jlong time); 
 void unpark();
// hotspot/os/posix/os_posix.hppclass PlatformParker :
 public CHeapObj<mtInternal> { protected:  enum {   
 REL_INDEX = 0,    ABS_INDEX = 1 
 };  int _cur_index;  // which cond is in use: -1, 0, 1  pthread_mutex_t _mutex[1];
  pthread_cond_t  _cond[2];
 // one for relative times and one for absolute  
...};

看到這里大概就能知道 park 是使用 pthread_mutex_t 與 pthread_cond_t 實現。好了,到目前為止,就引出了 Java 中與堵塞相關的實現,不難想象,都是依賴底層操作系統的功能。

OS 支持的同步原語

Semaphore

并發編程領域的先鋒人物 Edsger Dijkstra(也是最短路徑算法的作者)在 1965 年首次提出了信號量( Semaphores) 這一概念來解決線程同步的問題。信號量是一種特殊的變量類型,為非負整數,只有兩個特殊操作PV:

  • P(s) 如果 s!=0,將 s-1;否則將當前線程掛起,直到 s 變為非零
  • V(s) 將 s+1,如果有線程堵塞在 P 操作等待 s 變成非零,那么 V 操作會重啟這些線程中的任意一個

注:Dijkstra 為荷蘭人,名字 P 和 V 來源于荷蘭單詞 Proberen(測試)和Verhogen(增加),為方便理解,后文會用 Wait 與 Signal 來表示。

struct semaphore {    
 int val;    
 thread_list waiting;  // List of threads waiting for semaphore
}wait(semaphore Sem):  
  // Wait until > 0 then decrement 
 // 這里用的是 while 而不是 if
  // 這是因為在 wait 過程中,其他線程還可能繼續調用 wait  
while (Sem.val <= 0) { 
   add this thread to Sem.waiting;    block(this thread);  }  Sem.val = Sem.val - 1;return;
signal(semaphore Sem):// Increment value and wake up next thread    
 Sem.val = Sem.val + 1;  
   if (Sem.waiting is nonempty) {   
      remove a thread T from Sem.waiting;         wakeup(T);     }

有兩點注意事項:

  1. wait 中的「測試和減 1 操作」,signal 中的「加 1 操作」需要保證原子性。一般來說是使用硬件支持的 read-modify-write 原語,比如 test-and-set/fetch-and-add/compare-and-swap,除了硬件支持外,還可以用 busy wait 的軟件方式來模擬。
  2. signal 中沒有定義重新啟動的線程順序,也即多個線程在等待同一信號量時,無法預測重啟哪一個線程

使用場景

信號量為控制并發程序的執行提供了強有力工具,這里列舉兩個場景:

互斥

信號量提供了了一種很方便的方法來保證對共享變量的互斥訪問,基本思想是

將每個共享變量(或一組相關的共享變量)與一個信號量 s (初始化為1)聯系起來,然后用 wait/signal 操作將相應的臨界區包圍起來。

二元信號量也被稱為互斥鎖(mutex,mutual exclusve, 也稱為 binary semaphore),wait 操作相當于加鎖,signal 相當于解鎖。一個被用作一組可用資源的計數器的信號量稱為計數信號量(counting semaphore)

調度共享資源

除了互斥外,信號量的另一個重要作用是調度對共享資源的訪問,比較經典的案例是生產者消費者,偽代碼如下:

emptySem = NfullSem = 0
// Producerwhile(whatever) {    locally generate item    wait(emptySem)    fill empty buffer with item    signal(fullSem)
}// Consumerwhile(whatever) {    wait(fullSem)   
 get item from full buffer    signal(emptySem)    use item
}

POSIX 實現

POSIX 標準中有定義信號量相關的邏輯,在 semaphore.h 中,為 sem_t 類型,相關 API:

// Intialize: sem_init(&theSem, 0, initialVal);
// Wait: sem_wait(&theSem);
// Signal: sem_post(&theSem);
// Get the current value of the semaphore:     
  sem_getvalue(&theSem, &result);

信號量主要有兩個缺點:

  • Lack of structure,在設計大型系統時,很難保證 wait/signal 能以正確的順序成對出現,順序與成對缺一不可,否則就會出現死鎖!
  • Global visiblity,一旦程序出現死鎖,整個程序都需要去檢查

解決上述兩個缺點的新方案是監控器(monitor)。

Monitors

C. A. R. Hoare(也是 Quicksort 的作者) 在 1974 年的論文 Monitors: an operating system structuring concept 首次提出了「監控器」概念,它提供了對信號量互斥和調度能力的更高級別的抽象,使用起來更加方便,一般形式如下:

monitor1 . . . monitorM
process1 . . . processN

我們可以認為監控器是這么一個對象:

  • 所有訪問同一監控器的線程通過條件變量(condition variables)間接通信
  • 某一個時刻,只能有一個線程訪問監控器

Condition variables

上面提到監控器通過條件變量(簡寫 cv)來協調線程間的通信,那么條件變量是什么呢?它其實是一個 FIFO 的隊列,用來保存那些因等待某些條件成立而被堵塞的線程,對于一個條件變量 c 來說,會關聯一個斷言(assertion) P。線程在等待 P 成立的過程中,該線程不會鎖住該監控器,這樣其他線程就能夠進入監控器,修改監控器狀態;在 P 成立時,其他線程會通知堵塞的線程,因此條件變量上主要有三個操作:

  1. wait(cv, m) 等待 cv 成立,m 表示與監控器關聯的一 mutex 鎖
  2. signal(cv) 也稱為 notify(cv) 用來通知 cv 成立,這時會喚醒等待的線程中的一個執行。根據喚醒策略,監控器分為兩類:Hoare vs. Mesa,后面會介紹
  3. broadcast(cv) 也稱為 notifyAll(cv) 喚醒所有等待 cv 成立的線程

POSIX 實現

在 pthreads 中,條件變量的類型是 pthread_cond_t,主要有如下幾個方法:

// initializepthread_cond_init() 
pthread_cond_wait(&theCV, &someLock);
pthread_cond_signal(&theCV);
pthread_cond_broadcast(&theCV);

使用方式

在 pthreads 中,所有使用條件變量的地方都必須用一個 mutex 鎖起來,這是為什么呢?看下面一個例子:

pthread_mutex_t myLock;pthread_cond_t myCV;int count = 0;
// Thread Apthread_mutex_lock(&myLock);while(count < 0) {   
 pthread_cond_wait(&myCV, &myLock);}pthread_mutex_unlock(&myLock);
// Thread B
pthread_mutex_lock(&myLock);
count ++;
while(count == 10) {  
  pthread_cond_signal(&myCV);
}
pthread_mutex_unlock(&myLock);

如果沒有鎖,那么

  • 線程 A 可能會在其他線程將 count 賦值為10后繼續等待
  • 線程 B 無法保證加一操作與測試 count 是否為零 的原子性

這里的關鍵點是,在進行條件變量的 wait 時,會釋放該鎖,以保證其他線程能夠將之喚醒。不過需要注意的是,在線程 B 通知(signal) myCV 時,線程 A 無法立刻恢復執行,這是因為 myLock 這個鎖還被線程 B 持有,只有在線程 B unlock(&myLock) 后,線程 A 才可恢復。總結一下:

  1. wait 時會釋放鎖
  2. signal 會喚醒等待同一 cv 的線程
  3. 被喚醒的線程需要重新獲取鎖,然后才能從 wait 中返回

Hoare vs. Mesa 監控器語義

在上面條件變量中,我們提到 signal 在調用時,會去喚醒等待同一 cv 的線程,根據喚醒策略的不同,監控器也分為兩類:

  • Hoare 監控器(1974),最早的監控器實現,在調用 signal 后,會立刻運行等待的線程,這時調用 signal 的線程會被堵塞(因為鎖被等待線程占有了)
  • Mesa 監控器(Xerox PARC, 1980),signal 會把等待的線程重新放回到監控的 ready 隊列中,同時調用 signal 的線程繼續執行。這種方式是現如今 pthreads/Java/C# 采用的

這兩類監控器的關鍵區別在于等待線程被喚醒時,需要重新檢查 P 是否成立。

Java 并發基礎之內存模型

 

 

上圖表示藍色的線程在調用監控器的 get 方式時,數據為空,因此開始等待 emptyFull 條件;緊接著,紅色線程調用監控器的 set 方法改變 emptyFull 條件,這時

  • 按照 Hoare 思路,藍色線程會立刻執行,并且紅色線程堵塞
  • 按照 Mesa 思路,紅色線程會繼續執行,藍色線程會重新與綠色線程競爭與監控器關聯的鎖

Java 中的監控器

在 Java 中,每個對象都是一個監控器(因此具備一個 lock 與 cv),調用對象 o 的 synchronized 方法 m 時,會首先去獲取 o 的鎖,除此之外,還可以調用 o 的 wait/notify/notify 方法進行并發控制

Big Picture

 

Java 并發基礎之內存模型

 

 

Interruptible

通過介紹操作系統支持的同步原語,我們知道了 park/unpark、wait/notify 其實就是利用信號量( pthread_mutex_t)、條件變量(pthread_cond_t)實現的,其實監控器也可以用信號量來實現。在查看 AQS 中,發現有這么一個屬性:

/** 
* The number of nanoseconds 
for which it is faster to spin 
* rather than to use timed park. A rough estimate suffices 
* to improve responsiveness with very short timeouts. 
*/static final long spinForTimeoutThreshold = 1000L;

也就是說,在小于 1000 納秒時,await 條件變量 P 時,會使用一個循環來代替條件變量的堵塞與喚醒,這是由于堵塞與喚醒本身的操作開銷可能就遠大于 await 的 timeout。相關代碼:

// AQS 的 doAcquireNanos 方法節選
for (;;) { 
   final Node p = node.predecessor();    if (p == head && tryAcquire(arg)) {    
    setHead(node);        p.next = null; 
// help GC     
   failed = false;   
     return true;  
  }    nanosTimeout = deadline - System.nanoTime();    if (nanosTimeout <= 0L)      
  return false; 
   if (shouldParkAfterFailedAcquire(p, node) &&        
nanosTimeout > spinForTimeoutThreshold)        LockSupport.parkNanos(this, nanosTimeout);  
  if (Thread.interrupted())  
      throw new InterruptedException();
}

在 JUC 提供的高級同步類中,acquire 對應 park,release 對應 unpark,interrupt 其實就是個布爾的 flag 位,在 unpark 被喚醒時,檢查該 flag ,如果為 true,則會拋出我們熟悉的 InterruptedException。

Selector.select() 響應中斷異常的邏輯有些特別,因為對于這類堵塞 IO 操作來說,沒有條件變量的堵塞喚醒機制,我們可以再看下 Thread.interrupt 的實現

public void interrupt() {  
  if (this != Thread.currentThread())
        checkAccess();    synchronized (blockerLock) {  
      Interruptible b = blocker;        if (b != null) {       
     interrupt0();           // Just to set the interrupt flag  
          b.interrupt(this);         
   return;       
 }  
  }   
 interrupt0();
}

OpenJDK 使用了這么一個技巧來實現堵塞 IO 的中斷喚醒:在一個線程被堵塞時,會關聯一個 Interruptible 對象。對于 Selector 來說,在開始時,會關聯這么一個Interruptible 對象:

protected final void begin() {  
  if (interruptor == null) {     
   interruptor = new Interruptible() {          
      public void interrupt(Thread target) {       
             synchronized (closeLock) {           
             if (closed)         
                   return;            
            closed = true;           
             interrupted = target;                        try {                
            AbstractInterruptibleChannel.this.implCloseChannel();      
                  } catch (IOException x) { 
}                    }                }};    }    blockedOn(interruptor);    Thread me = Thread.currentThread();    if (me.isInterrupted())  
      interruptor.interrupt(me);}

當調用 interrupt 方式時,會關閉該 channel,這樣就會關閉掉這個堵塞線程,可見為了實現這個功能,代價也是比較大的。LockSupport.park 中采用了類似技巧。

總結

也許基于多線程的并發編程不是最好的(可能是最復雜的,Clojure 大法好 :-),但卻是最悠久的。即便我們自己不去寫往往也需要閱讀別人的多線程代碼,而且能夠寫出“正確”(who knows?)的多線程程序往往也是區分 senior 與 junior 程序

分享到:
標簽:并發 Java
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定