原始自旋鎖
最原始的自旋鎖就是多個線程不斷自旋,大家都不斷嘗試獲取鎖。看下面例子,主要看lock和unlock兩個方法,Unsafe僅僅是為操作提供了硬件級別的原子CAS操作。對于lock方法,假如有若干線程競爭,能成功通過CAS將value值修改為newV的線程即是成功獲取鎖的線程。成功獲取鎖的線程將順利通過,而其它線程則不斷在循環檢測value值是否改回0,將value改為0的操作就是獲取鎖的線程釋放鎖的操作。對于unlock方法,用于釋放鎖,釋放后其它線程又繼續對該鎖競爭。如此一來,沒獲得鎖的線程也不會被掛起或阻塞,而是不斷循環檢查狀態。
1. public class SpinLock {
2. private static Unsafe unsafe = null;
3. private static final long valueOffset;
4. private volatile int value = 0;
5. static {
6. try {
7. unsafe = getUnsafeInstance();
8. valueOffset = unsafe.objectFieldOffset(SpinLock.class.getDeclaredField("value"));
9. } catch (Exception ex) {
10. throw new Error(ex);
11. }
12. }
13.
14. private static Unsafe getUnsafeInstance() throws Exception {
15. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
16. theUnsafeInstance.setAccessible(true);
17. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
18. }
19.
20. public void lock() {
21. for (;;) {
22. int newV = value + 1;
23. if (newV == 1)
24. if (unsafe.compareAndSwapInt(this, valueOffset, 0, newV)) {
25. return;
26. }
27. }
28. }
29.
30. public void unlock() {
31. unsafe.compareAndSwapInt(this, valueOffset, 1, 0);
32. }
33. }
排隊自旋鎖
鑒于原始自旋鎖存在公平性問題,于是引入一種排隊機制來解決它,這就是排隊自旋鎖。所有線程在嘗試獲取鎖之前得先拿到一個排隊號,然后再不斷輪詢當前是不是已經輪到自己了,判斷的依據就是當前處理號是否等于自己的排隊號。如果兩者相等,則表示已經輪到自己了,于是得到鎖并往下執行。
看下面例子,主要看lock和unlock兩個方法,Unsafe僅僅是為操作提供了硬件級別的原子CAS操作。對于lock方法,首先通過不斷循環去嘗試拿到一個排隊號,一旦成功拿到排隊號后就開始通過while(processingNum != nowNum)輪詢看自己是否已經輪到了。而unlock方法則是直接修改當前處理號,直接加1,表示自己已經不需要鎖了,可以讓給下一位了。
1. public class TicketLock {
2. private static Unsafe unsafe = null;
3. private static final long ticketNumOffset;
4. private static final long processingNumOffset;
5. private volatile int ticketNum = 0;
6. private volatile int processingNum = 0;
7. static {
8. try {
9. unsafe = getUnsafeInstance();
10. ticketNumOffset = unsafe
11. .objectFieldOffset(TicketLock.class.getDeclaredField("ticketNum"));
12. processingNumOffset = unsafe
13. .objectFieldOffset(TicketLock.class.getDeclaredField("processingNum"));
14. } catch (Exception ex) {
15. throw new Error(ex);
16. }
17. }
18.
19. private static Unsafe getUnsafeInstance() throws Exception {
20. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
21. theUnsafeInstance.setAccessible(true);
22. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
23. }
24.
25. public int lock() {
26. int nowNum;
27. for (;;) {
28. nowNum = ticketNum;
29. if (unsafe.compareAndSwapInt(this, ticketNumOffset, ticketNum, ticketNum + 1)) {
30. break;
31. }
32. }
33. while (processingNum != nowNum) {
34. }
35.
36. return nowNum;
37. }
38.
39. public void unlock(int ticket) {
40. int next = ticket + 1;
41. unsafe.compareAndSwapInt(this, processingNumOffset, ticket, next);
42. }
43.
44. }
CLH鎖
為了優化同步帶來的花銷,Craig、Landin、Hagersten三個人發明了CLH鎖。其核心思想是:通過一定手段將所有線程對某一共享變量的輪詢競爭轉化為一個線程隊列,且隊列中的線程各自輪詢自己的本地變量。
這個轉化過程有兩個要點:一是應該構建怎樣的隊列以及如何構建隊列?為了保證公平性,我們構建的將是一個FIFO隊列。構建的時候主要通過移動尾部節點tail來實現隊列的排隊,每個想獲取鎖的線程創建一個新節點并通過CAS原子操作將新節點賦給tail,然后讓當前線程輪詢前一節點的某個狀態位。如圖可以清晰看到隊列結構及自旋操作,這樣就成功構建了線程排隊隊列。二是如何釋放隊列?執行完線程后只需將當前線程對應的節點狀態位置為解鎖狀態即可,由于下一節點一直在輪詢,所以可獲取到鎖。
所以,CLH鎖的核心思想是將眾多線程長時間對某資源的競爭,通過有序化這些線程將其轉化為只需對本地變量檢測。而唯一存在競爭的地方就是在入隊列之前對尾節點tail的競爭,但此時競爭的線程數量已經少了很多了。比起所有線程直接對某資源競爭的輪詢次數也減少了很多,這也大大節省了CPU緩存同步的消耗,從而大大提升系統性能。
下面我們提供一個簡單的CLH鎖實現代碼,以便更好理解CLH鎖的原理。其中lock與unlock兩方法提供加鎖和解鎖操作,每次加鎖解鎖必須將一個CLHNode對象作為參數傳入。lock方法的for循環是通過CAS操作將新節點插入隊列,而while循環則是檢測前驅節點的鎖狀態位。一旦前驅節點鎖狀態位允許則結束檢測,讓線程往下執行。解鎖操作先判斷當前節點是否為尾節點,如是則直接將尾節點設置為空,此時說明僅僅只有一條線程在執行,否則將當前節點的鎖狀態位設置為解鎖狀態。
1. public class CLHLock {
2. private static Unsafe unsafe = null;
3. private static final long valueOffset;
4. private volatile CLHNode tail;
5.
6. public class CLHNode {
7. private volatile boolean isLocked = true;
8. }
9.
10. static {
11. try {
12. unsafe = getUnsafeInstance();
13. valueOffset = unsafe.objectFieldOffset(CLHLock.class.getDeclaredField("tail"));
14. } catch (Exception ex) {
15. throw new Error(ex);
16. }
17. }
18.
19. public void lock(CLHNode currentThreadNode) {
20. CLHNode preNode = null;
21. for (;;) {
22. preNode = tail;
23. if (unsafe.compareAndSwapObject(this, valueOffset, preNode, currentThreadNode))
24. break;
25. }
26. if (preNode != null)
27. while (preNode.isLocked) {
28. }
29. }
30.
31. public void unlock(CLHNode currentThreadNode) {
32. if (!unsafe.compareAndSwapObject(this, valueOffset, currentThreadNode, null))
33. currentThreadNode.isLocked = false;
34. }
35.
36. private static Unsafe getUnsafeInstance() throws Exception {
37. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
38. theUnsafeInstance.setAccessible(true);
39. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
40. }
41. }
MCS鎖
MCS鎖由John Mellor-Crummey和Michael Scott兩人發明,它的出現旨在解決CLH鎖存在的問題。它也是基于FIFO隊列,與CLH鎖相似,不同的地方在于輪詢的對象不同。MCS鎖中線程只對本地變量自旋,而前驅節點則負責通知其結束自旋操作。這樣的話就減少了CPU緩存與主存儲之間的不必要的同步操作,減少了同步帶來的性能損耗。
如下圖,每個線程對應著隊列中的一個節點。節點內有一個spin變量,表示是否需要旋轉。一旦前驅節點使用完鎖后便修改后繼節點的spin變量,通知其不必繼續做自旋操作,已成功獲取鎖。
下面我們提供一個簡單的MCS鎖實現代碼,以便更好理解MCS鎖的原理。其中lock與unlock兩方法提供加鎖和解鎖操作,每次加鎖解鎖必須將一個MCSNode對象作為參數傳入。lock方法的for循環是通過CAS操作將新節點賦給隊列尾部節點tail,如果存在前驅節點的話則自己要開始自旋操作,等待前驅節點解鎖時通知自己。一旦前驅節點執行解鎖后,則會將本節點的spin變量修改為false,本節點則獲取到鎖并停止自旋,讓線程往下執行。解鎖操作先判斷當前節點是否為尾節點,如果是則什么都不用處理。此時說明僅僅只有一條線程在執行,否則將后繼節點的spin變量設置為false。此時要考慮特殊情況,如果不存在后繼節點則將尾部節點tail設為null。在此期間可能又有線程進來,這時tail的CAS修改會失敗,所以就只能自旋等后繼節點不為空再往下執行。
1. public class MCSLock {
2. private static Unsafe unsafe = null;
3. volatile MCSNode tail;
4. private static final long valueOffset;
5.
6. public static class MCSNode {
7. MCSNode next;
8. volatile boolean spin = true;
9. }
10.
11. static {
12. try {
13. unsafe = getUnsafeInstance();
14. valueOffset = unsafe.objectFieldOffset(MCSLock.class.getDeclaredField("tail"));
15. } catch (Exception ex) {
16. throw new Error(ex);
17. }
18. }
19.
20. private static Unsafe getUnsafeInstance() throws Exception {
21. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
22. theUnsafeInstance.setAccessible(true);
23. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
24. }
25.
26. public void lock(MCSNode currentThreadMcsNode) {
27. MCSNode predecessor = null;
28. for (;;) {
29. predecessor = tail;
30. if (unsafe.compareAndSwapObject(this, valueOffset, tail, currentThreadMcsNode))
31. break;
32. }
33. if (predecessor != null) {
34. predecessor.next = currentThreadMcsNode;
35. while (currentThreadMcsNode.spin) {
36. }
37. }
38. }
39.
40. public void unlock(MCSNode currentThreadMcsNode) {
41. if (tail != currentThreadMcsNode) {
42. if (currentThreadMcsNode.next == null) {
43. if (unsafe.compareAndSwapObject(this, valueOffset, currentThreadMcsNode, null)) {
44. return;
45. } else {
46. while (currentThreadMcsNode.next == null) {
47. }
48. }
49. }
50. currentThreadMcsNode.next.spin = false;
51. }
52. }
}
更多JAVA并發原理剖析請關注下面的專欄。
作者簡介:筆名seaboat,擅長人工智能、計算機科學、數學原理、基礎算法。出版書籍:《圖解數據結構與算法》、《Tomcat內核設計剖析》、《人工智能原理科普》。