線程同步可以說在日常開發中是用的很多, 但對于其內部如何實現的,一般人可能知道的并不多。 本篇文章將從如何實現簡單的鎖開始,介紹linux中的鎖實現futex的優點及原理,最后分析JAVA中同步機制如wait/notify, synchronized, ReentrantLock。
自己實現鎖
首先,如果要你實現操作系統的鎖,該如何實現?先想想這個問題,暫時不考慮性能、可用性等問題,就用最簡單、粗暴的方式。當你心中有個大致的思路后,再接著往下看。
下文中的代碼都是偽代碼。
自旋
最容易想到可能是自旋:
volatile int status=0; void lock(){ while(!compareAndSet(0,1)){ } //get lock } void unlock(){ status=0; } boolean compareAndSet(int except,int newValue){ //cas操作,修改status成功則返回true }
上面的代碼通過自旋和cas來實現一個最簡單的鎖。
這樣實現的鎖顯然有個致命的缺點:耗費cpu資源。沒有競爭到鎖的線程會一直占用cpu資源進行cas操作,假如一個線程獲得鎖后要花費10s處理業務邏輯,那另外一個線程就會白白的花費10s的cpu資源。(假設系統中就只有這兩個線程的情況)。
yield+自旋
要解決自旋鎖的性能問題必須讓競爭鎖失敗的線程不忙等,而是在獲取不到鎖的時候能把cpu資源給讓出來,說到讓cpu資源,你可能想到了yield()方法,看看下面的例子:
volatile int status=0; void lock(){ while(!compareAndSet(0,1)){ yield(); } //get lock } void unlock(){ status=0; }
當線程競爭鎖失敗時,會調用yield方法讓出cpu。需要注意的是該方法只是當前讓出cpu,有可能操作系統下次還是選擇運行該線程。其實現是 將當期線程移動到所在優先調度隊列的末端(操作系統線程調度了解一下?有時間的話,下次寫寫這塊內容)。也就是說,如果該線程處于優先級最高的調度隊列且該隊列只有該線程,那操作系統下次還是運行該線程。
自旋+yield的方式并沒有完全解決問題,當系統只有兩個線程競爭鎖時,yield是有效的。但是如果有100個線程競爭鎖,當線程1獲得鎖后,還有99個線程在反復的自旋+yield,線程2調用yield后,操作系統下次運行的可能是線程3;而線程3CAS失敗后調用yield后,操作系統下次運行的可能是線程4... 假如運行在單核cpu下,在競爭鎖時最差只有1%的cpu利用率,導致獲得鎖的線程1一直被中斷,執行實際業務代碼時間變得更長,從而導致鎖釋放的時間變的更長。
sleep+自旋
你可能從一開始就想到了,當競爭鎖失敗后,可以將用Thread.sleep將線程休眠,從而不占用cpu資源:
volatile int status=0; void lock(){ while(!compareAndSet(0,1)){ sleep(10); } //get lock } void unlock(){ status=0; }
上述方式我們可能見的比較多,通常用于實現上層鎖。該方式不適合用于操作系統級別的鎖,因為作為一個底層鎖,其sleep時間很難設置。sleep的時間取決于同步代碼塊的執行時間,sleep時間如果太短了,會導致線程切換頻繁(極端情況和yield方式一樣);sleep時間如果設置的過長,會導致線程不能及時獲得鎖。因此沒法設置一個通用的sleep值。就算sleep的值由調用者指定也不能完全解決問題:有的時候調用鎖的人也不知道同步塊代碼會執行多久。
park+自旋
那可不可以在獲取不到鎖的時候讓線程釋放cpu資源進行等待,當持有鎖的線程釋放鎖的時候將等待的線程喚起呢?
volatile int status=0; Queue parkQueue; void lock(){ while(!compareAndSet(0,1)){ // lock_wait(); } //get lock } void synchronized unlock(){ lock_notify(); } void lock_wait(){ //將當期線程加入到等待隊列 parkQueue.add(nowThread); //將當期線程釋放cpu releaseCpu(); } void lock_notify(){ //得到要喚醒的線程 Thread t=parkList.poll(); //喚醒等待線程 wakeAThread(t); }
上面是偽代碼,描述這種設計思想,至于釋放cpu資源、喚醒等待線程的的具體實現,后文會再說。這種方案相比于sleep而言,只有在鎖被釋放的時候,競爭鎖的線程才會被喚醒,不會存在過早或過完喚醒的問題。
小結
對于鎖沖突不嚴重的情況,用自旋鎖會更適合,試想每個線程獲得鎖后很短的一段時間內就釋放鎖,競爭鎖的線程只要經歷幾次自旋運算后就能獲得鎖,那就沒必要等待該線程了,因為等待線程意味著需要進入到內核態進行上下文切換,而上下文切換是有成本的并且還不低,如果鎖很快就釋放了,那上下文切換的開銷將超過自旋。
目前操作系統中,一般是用自旋+等待結合的形式實現鎖:在進入鎖時先自旋一定次數,如果還沒獲得鎖再進行等待。
futex
linux底層用futex實現鎖,futex由一個內核層的隊列和一個用戶空間層的atomic integer構成。當獲得鎖時,嘗試cas更改integer,如果integer原始值是0,則修改成功,該線程獲得鎖,否則就將當期線程放入到 wait queue中(即操作系統的等待隊列)。
上述說法有些抽象,如果你沒看明白也沒關系。我們先看一下沒有futex之前,linux是怎么實現鎖的。
futex誕生之前
在futex誕生之前,linux下的同步機制可以歸為兩類:用戶態的同步機制 和內核同步機制。 用戶態的同步機制基本上就是利用原子指令實現的自旋鎖。關于自旋鎖其缺點也說過了,不適用于大的臨界區(即鎖占用時間比較長的情況)。
內核提供的同步機制,如semaphore等,使用的是上文說的自旋+等待的形式。 它對于大小臨界區和都適用。但是因為它是內核層的(釋放cpu資源是內核級調用),所以每次lock與unlock都是一次系統調用,即使沒有鎖沖突,也必須要通過系統調用進入內核之后才能識別。
理想的同步機制應該是沒有鎖沖突時在用戶態利用原子指令就解決問題,而需要掛起等待時再使用內核提供的系統調用進行睡眠與喚醒。換句話說,在用戶態的自旋失敗時,能不能讓進程掛起,由持有鎖的線程釋放鎖時將其喚醒? 如果你沒有較深入地考慮過這個問題,很可能想當然的認為類似于這樣就行了(偽代碼):
void lock(int lockval) { //trylock是用戶級的自旋鎖 while(!trylock(lockval)) { wait();//釋放cpu,并將當期線程加入等待隊列,是系統調用 } } boolean trylock(int lockval){ int i=0; //localval=1代表上鎖成功 while(!compareAndSet(lockval,0,1)){ if(++i>10){ return false; } } return true; } void unlock(int lockval) { compareAndSet(lockval,1,0); notify(); }
上述代碼的問題是trylock和wait兩個調用之間存在一個窗口: 如果一個線程trylock失敗,在調用wait時持有鎖的線程釋放了鎖,當前線程還是會調用wait進行等待,但之后就沒有人再將該線程喚醒了。
futex誕生之后
我們來看看futex的方法定義:
//uaddr指向一個地址,val代表這個地址期待的值,當*uaddr==val時,才會進行wait int futex_wait(int *uaddr, int val); //喚醒n個在uaddr指向的鎖變量上掛起等待的進程 int futex_wake(int *uaddr, int n);
futex_wait真正將進程掛起之前會檢查addr指向的地址的值是否等于val,如果不相等則會立即返回,由用戶態繼續trylock。否則將當期線程插入到一個隊列中去,并掛起。
futex內部維護了一個隊列,在線程掛起前會線程插入到其中,同時對于隊列中的每個節點都有一個標識,代表該線程關聯鎖的uaddr。這樣,當用戶態調用futex_wake時,只需要遍歷這個等待隊列,把帶有相同uaddr的節點所對應的進程喚醒就行了。
作為優化,futex維護的其實是個類似java 中的concurrent hashmap的結構。其持有一個總鏈表,總鏈表中每個元素都是一個帶有自旋鎖的子鏈表。調用futex_wait掛起的進程,通過其uaddr hash到某一個具體的子鏈表上去。這樣一方面能分散對等待隊列的競爭、另一方面減小單個隊列的長度,便于futex_wake時的查找。每個鏈表各自持有一把spinlock,將"*uaddr和val的比較操作"與"把進程加入隊列的操作"保護在一個臨界區中。 另外,futex是支持多進程的,當使用futex在多進程間進行同步時,需要考慮同一個物理內存地址在不同進程中的虛擬地址是不同的。