Version:1.0 Starthtml:000000206 EndHTML:000175716 StartFragment:000004583 EndFragment:000175611 StartSelection:000004679 EndSelection:000175592 SourceURL:https://zhuanlan.zhihu.com/p/210471606/edit
現如今,服務器性能日益增長,并發(concurrency)編程已經“深入人心”,但由于馮諾依式計算機“指令存儲,順序執行”的特性,使得編寫跨越時間維度的并發程序異常困難,所以現代編程語言都對并發編程提供了一定程度的支持,像 Golang 里面的 Goroutines、Clojure 里面的 STM(Software Transactional Memory)、Erlang 里面的 Actor。
JAVA 對于并發編程的解決方案是多線程(Multi-threaded programming),而且 Java 中的線程 與 native 線程一一對應,多線程也是早期操作系統支持并發的方案之一(其他方案:多進程、IO多路復用)。
本文著重介紹 Java 中線程同步的原理、實現機制,更側重操作系統層面,部分原理參考 openjdk 源碼。閱讀本文需要對 CyclicBarrier、CountDownLatch 有基本的使用經驗。
JUC
在 Java 1.5 版本中,引入 JUC 并發編程輔助包,很大程度上降低了并發編程的門檻,JUC 里面主要包括:
- 線程調度的 Executors
- 緩沖任務的 Queues
- 超時相關的 TimeUnit
- 并發集合(如 ConcurrentHashMap)
- 線程同步類(Synchronizers,如 CountDownLatch )
個人認為其中最重要也是最核心的是線程同步這一塊,因為并發編程的難點就在于如何保證「共享區域(專業術語:臨界區,Critical Section)的訪問時序問題」。
AbstractQueuedSynchronizer
JUC 提供的同步類主要有如下幾種:
- Semaphore is a classic concurrency tool.
- CountDownLatch is a very simple yet very common utility for blocking until a given number of signals, events, or conditions hold.
- A CyclicBarrier is a resettable multiway synchronization point useful in some styles of parallel programming.
- A Phaser provides a more flexible form of barrier that may be used to control phased computation among multiple threads.
- An Exchanger allows two threads to exchange objects at a rendezvous(約會) point, and is useful in several pipeline designs.
通過閱讀其源碼可以發現,其實現都基于 AbstractQueuedSynchronizer 這個抽象類(一般簡寫 AQS),正如其 javadoc 開頭所說:
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.
也就是說,AQS 通過維護內部的 FIFO 隊列和具備原子更新的整型 state 這兩個屬性來實現各種鎖機制,包括:是否公平,是否可重入,是否共享,是否可中斷(interrupt),并在這基礎上,提供了更方便實用的同步類,也就是一開始提及的 Latch、Barrier 等。
這里暫時不去介紹 AQS 實現細節與如何基于 AQS 實現各種同步類(挖個坑),感興趣的可以移步美團的一篇文章《不可不說的Java“鎖”事》 第六部分“獨享鎖 VS 共享鎖”。
在學習 Java 線程同步這一塊時,對我來說困擾最大的是「線程喚醒」,試想一個已經 wait/sleep/block 的線程,是如何響應 interrupt 的呢?當調用 Object.wait() 或 lock.lock() 時,JVM 究竟做了什么事情能夠在調用 Object.notify 或 lock.unlock 時重新激活相應線程?
帶著上面的問題,我們從源碼中尋找答案。
Java 如何實現堵塞、通知
wait/notify
public final native void wait(long timeout) throws
InterruptedException;public final native void notify();
在 JDK 源碼中,上述兩個方法均用 native 實現(即 cpp 代碼),追蹤相關代碼
// java.base/share/native/libjava/Object.cstatic
JNINativeMethod methods[] = { {"hashCode", "()I",
(void *)&JVM_IHashCode}, {"wait", "(J)V",
(void *)&JVM_MonitorWait}, {"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
通過上面的 cpp 代碼,我們大概能猜出 JVM 是使用 monitor 來實現的 wait/notify 機制,至于這里的 monitor 是何種機制,這里暫時跳過,接著看 lock 相關實現
lock/unlock
LockSupport 是用來實現堵塞語義模型的基礎輔助類,主要有兩個方法:park 與 unpark。(在英文中,park 除了“公園”含義外,還有“停車”的意思)
// LockSupport.java
public static void unpark(Thread thread) { if (thread != null)
UNSAFE.unpark(thread); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }// Unsafe.java /** * Unblocks the given thread blocked on {@code park}, or, if it is
* not blocked, causes the subsequent call to {@code park} not to
* block. Note: this operation is "unsafe" solely because the
* caller must somehow ensure that the thread has not been
* destroyed. Nothing special is usually required to ensure this * when called from Java (in which there will ordinarily be a live
* reference to the thread) but this is not nearly-automatically
* so when calling from native code.
* * @param thread the thread to unpark. */ @HotSpotIntrinsicCandidate public native void unpark(Object thread);
/**
* Blocks current thread, returning when a balancing
* {@code unpark} occurs, or a balancing {@code unpark} has
* already occurred, or the thread is interrupted, or, if not
* absolute and time is not zero, the given time nanoseconds have
* elapsed, or if absolute, the given deadline in milliseconds
* since Epoch has passed, or spuriously (i.e., returning for no
* "reason"). Note: This operation is in the Unsafe class only
* because {@code unpark} is, so it would be strange to place it * elsewhere. */
@HotSpotIntrinsicCandidate public native void park(boolean isAbsolute, long time);
// hotspot/share/prims/unsafe.cppUNSAFE_ENTRY(void, Unsafe_Park(JNIEnv
*env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event; JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
post_thread_park_event(&event, thread->current_park_blocker(), time);
} HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());} UNSAFE_END
通過上述 unsafe.cpp 可以看到每個 thread 都會有一個 Parker 對象,所以我們需要查看 parker 對象的定義
// hotspot/share/runtime/park.hppclass Parker :
public os::PlatformParker...public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute)
are multiplexed into one call. void park(bool isAbsolute, jlong time);
void unpark();
// hotspot/os/posix/os_posix.hppclass PlatformParker :
public CHeapObj<mtInternal> { protected: enum {
REL_INDEX = 0, ABS_INDEX = 1
}; int _cur_index; // which cond is in use: -1, 0, 1 pthread_mutex_t _mutex[1];
pthread_cond_t _cond[2];
// one for relative times and one for absolute
...};
看到這里大概就能知道 park 是使用 pthread_mutex_t 與 pthread_cond_t 實現。好了,到目前為止,就引出了 Java 中與堵塞相關的實現,不難想象,都是依賴底層操作系統的功能。
OS 支持的同步原語
Semaphore
并發編程領域的先鋒人物 Edsger Dijkstra(也是最短路徑算法的作者)在 1965 年首次提出了信號量( Semaphores) 這一概念來解決線程同步的問題。信號量是一種特殊的變量類型,為非負整數,只有兩個特殊操作PV:
- P(s) 如果 s!=0,將 s-1;否則將當前線程掛起,直到 s 變為非零
- V(s) 將 s+1,如果有線程堵塞在 P 操作等待 s 變成非零,那么 V 操作會重啟這些線程中的任意一個
注:Dijkstra 為荷蘭人,名字 P 和 V 來源于荷蘭單詞 Proberen(測試)和Verhogen(增加),為方便理解,后文會用 Wait 與 Signal 來表示。
struct semaphore {
int val;
thread_list waiting; // List of threads waiting for semaphore
}wait(semaphore Sem):
// Wait until > 0 then decrement
// 這里用的是 while 而不是 if
// 這是因為在 wait 過程中,其他線程還可能繼續調用 wait
while (Sem.val <= 0) {
add this thread to Sem.waiting; block(this thread); } Sem.val = Sem.val - 1;return;
signal(semaphore Sem):// Increment value and wake up next thread
Sem.val = Sem.val + 1;
if (Sem.waiting is nonempty) {
remove a thread T from Sem.waiting; wakeup(T); }
有兩點注意事項:
- wait 中的「測試和減 1 操作」,signal 中的「加 1 操作」需要保證原子性。一般來說是使用硬件支持的 read-modify-write 原語,比如 test-and-set/fetch-and-add/compare-and-swap,除了硬件支持外,還可以用 busy wait 的軟件方式來模擬。
- signal 中沒有定義重新啟動的線程順序,也即多個線程在等待同一信號量時,無法預測重啟哪一個線程
使用場景
信號量為控制并發程序的執行提供了強有力工具,這里列舉兩個場景:
互斥
信號量提供了了一種很方便的方法來保證對共享變量的互斥訪問,基本思想是
將每個共享變量(或一組相關的共享變量)與一個信號量 s (初始化為1)聯系起來,然后用 wait/signal 操作將相應的臨界區包圍起來。
二元信號量也被稱為互斥鎖(mutex,mutual exclusve, 也稱為 binary semaphore),wait 操作相當于加鎖,signal 相當于解鎖。一個被用作一組可用資源的計數器的信號量稱為計數信號量(counting semaphore)
調度共享資源
除了互斥外,信號量的另一個重要作用是調度對共享資源的訪問,比較經典的案例是生產者消費者,偽代碼如下:
emptySem = NfullSem = 0
// Producerwhile(whatever) { locally generate item wait(emptySem) fill empty buffer with item signal(fullSem)
}// Consumerwhile(whatever) { wait(fullSem)
get item from full buffer signal(emptySem) use item
}
POSIX 實現
POSIX 標準中有定義信號量相關的邏輯,在 semaphore.h 中,為 sem_t 類型,相關 API:
// Intialize: sem_init(&theSem, 0, initialVal);
// Wait: sem_wait(&theSem);
// Signal: sem_post(&theSem);
// Get the current value of the semaphore:
sem_getvalue(&theSem, &result);
信號量主要有兩個缺點:
- Lack of structure,在設計大型系統時,很難保證 wait/signal 能以正確的順序成對出現,順序與成對缺一不可,否則就會出現死鎖!
- Global visiblity,一旦程序出現死鎖,整個程序都需要去檢查
解決上述兩個缺點的新方案是監控器(monitor)。
Monitors
C. A. R. Hoare(也是 Quicksort 的作者) 在 1974 年的論文 Monitors: an operating system structuring concept 首次提出了「監控器」概念,它提供了對信號量互斥和調度能力的更高級別的抽象,使用起來更加方便,一般形式如下:
monitor1 . . . monitorM
process1 . . . processN
我們可以認為監控器是這么一個對象:
- 所有訪問同一監控器的線程通過條件變量(condition variables)間接通信
- 某一個時刻,只能有一個線程訪問監控器
Condition variables
上面提到監控器通過條件變量(簡寫 cv)來協調線程間的通信,那么條件變量是什么呢?它其實是一個 FIFO 的隊列,用來保存那些因等待某些條件成立而被堵塞的線程,對于一個條件變量 c 來說,會關聯一個斷言(assertion) P。線程在等待 P 成立的過程中,該線程不會鎖住該監控器,這樣其他線程就能夠進入監控器,修改監控器狀態;在 P 成立時,其他線程會通知堵塞的線程,因此條件變量上主要有三個操作:
- wait(cv, m) 等待 cv 成立,m 表示與監控器關聯的一 mutex 鎖
- signal(cv) 也稱為 notify(cv) 用來通知 cv 成立,這時會喚醒等待的線程中的一個執行。根據喚醒策略,監控器分為兩類:Hoare vs. Mesa,后面會介紹
- broadcast(cv) 也稱為 notifyAll(cv) 喚醒所有等待 cv 成立的線程
POSIX 實現
在 pthreads 中,條件變量的類型是 pthread_cond_t,主要有如下幾個方法:
// initializepthread_cond_init()
pthread_cond_wait(&theCV, &someLock);
pthread_cond_signal(&theCV);
pthread_cond_broadcast(&theCV);
使用方式
在 pthreads 中,所有使用條件變量的地方都必須用一個 mutex 鎖起來,這是為什么呢?看下面一個例子:
pthread_mutex_t myLock;pthread_cond_t myCV;int count = 0;
// Thread Apthread_mutex_lock(&myLock);while(count < 0) {
pthread_cond_wait(&myCV, &myLock);}pthread_mutex_unlock(&myLock);
// Thread B
pthread_mutex_lock(&myLock);
count ++;
while(count == 10) {
pthread_cond_signal(&myCV);
}
pthread_mutex_unlock(&myLock);
如果沒有鎖,那么
- 線程 A 可能會在其他線程將 count 賦值為10后繼續等待
- 線程 B 無法保證加一操作與測試 count 是否為零 的原子性
這里的關鍵點是,在進行條件變量的 wait 時,會釋放該鎖,以保證其他線程能夠將之喚醒。不過需要注意的是,在線程 B 通知(signal) myCV 時,線程 A 無法立刻恢復執行,這是因為 myLock 這個鎖還被線程 B 持有,只有在線程 B unlock(&myLock) 后,線程 A 才可恢復。總結一下:
- wait 時會釋放鎖
- signal 會喚醒等待同一 cv 的線程
- 被喚醒的線程需要重新獲取鎖,然后才能從 wait 中返回
Hoare vs. Mesa 監控器語義
在上面條件變量中,我們提到 signal 在調用時,會去喚醒等待同一 cv 的線程,根據喚醒策略的不同,監控器也分為兩類:
- Hoare 監控器(1974),最早的監控器實現,在調用 signal 后,會立刻運行等待的線程,這時調用 signal 的線程會被堵塞(因為鎖被等待線程占有了)
- Mesa 監控器(Xerox PARC, 1980),signal 會把等待的線程重新放回到監控的 ready 隊列中,同時調用 signal 的線程繼續執行。這種方式是現如今 pthreads/Java/C# 采用的
這兩類監控器的關鍵區別在于等待線程被喚醒時,需要重新檢查 P 是否成立。
上圖表示藍色的線程在調用監控器的 get 方式時,數據為空,因此開始等待 emptyFull 條件;緊接著,紅色線程調用監控器的 set 方法改變 emptyFull 條件,這時
- 按照 Hoare 思路,藍色線程會立刻執行,并且紅色線程堵塞
- 按照 Mesa 思路,紅色線程會繼續執行,藍色線程會重新與綠色線程競爭與監控器關聯的鎖
Java 中的監控器
在 Java 中,每個對象都是一個監控器(因此具備一個 lock 與 cv),調用對象 o 的 synchronized 方法 m 時,會首先去獲取 o 的鎖,除此之外,還可以調用 o 的 wait/notify/notify 方法進行并發控制
Big Picture
Interruptible
通過介紹操作系統支持的同步原語,我們知道了 park/unpark、wait/notify 其實就是利用信號量( pthread_mutex_t)、條件變量(pthread_cond_t)實現的,其實監控器也可以用信號量來實現。在查看 AQS 中,發現有這么一個屬性:
/**
* The number of nanoseconds
for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/static final long spinForTimeoutThreshold = 1000L;
也就是說,在小于 1000 納秒時,await 條件變量 P 時,會使用一個循環來代替條件變量的堵塞與喚醒,這是由于堵塞與喚醒本身的操作開銷可能就遠大于 await 的 timeout。相關代碼:
// AQS 的 doAcquireNanos 方法節選
for (;;) {
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {
setHead(node); p.next = null;
// help GC
failed = false;
return true;
} nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
在 JUC 提供的高級同步類中,acquire 對應 park,release 對應 unpark,interrupt 其實就是個布爾的 flag 位,在 unpark 被喚醒時,檢查該 flag ,如果為 true,則會拋出我們熟悉的 InterruptedException。
Selector.select() 響應中斷異常的邏輯有些特別,因為對于這類堵塞 IO 操作來說,沒有條件變量的堵塞喚醒機制,我們可以再看下 Thread.interrupt 的實現
public void interrupt() {
if (this != Thread.currentThread())
checkAccess(); synchronized (blockerLock) {
Interruptible b = blocker; if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
OpenJDK 使用了這么一個技巧來實現堵塞 IO 的中斷喚醒:在一個線程被堵塞時,會關聯一個 Interruptible 對象。對于 Selector 來說,在開始時,會關聯這么一個Interruptible 對象:
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (closed)
return;
closed = true;
interrupted = target; try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) {
} } }}; } blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted())
interruptor.interrupt(me);}
當調用 interrupt 方式時,會關閉該 channel,這樣就會關閉掉這個堵塞線程,可見為了實現這個功能,代價也是比較大的。LockSupport.park 中采用了類似技巧。
總結
也許基于多線程的并發編程不是最好的(可能是最復雜的,Clojure 大法好 :-),但卻是最悠久的。即便我們自己不去寫往往也需要閱讀別人的多線程代碼,而且能夠寫出“正確”(who knows?)的多線程程序往往也是區分 senior 與 junior 程序