前言
在多線程環境下,數據的并發訪問和修改是無法避免的問題。
為了解決這個問題,JAVA 提供了一系列并發容器,這些容器在內部已經處理了并發問題,使得我們可以在多線程環境下安全地訪問和修改數據。
并發容器
1.ConcurrentHashMap 并發版 HashMap
最常見的并發容器之一,可以用作并發場景下的緩存。底層依然是哈希表,但在 JAVA 8 中有了不小的改變,而 JAVA 7 和 JAVA 8 都是用的比較多的版本,因此經常會將這兩個版本的實現方式做一些比較(比如面試中)。
一個比較大的差異就是,JAVA 7 中采用分段鎖來減少鎖的競爭,JAVA 8 中放棄了分段鎖,采用 CAS(一種樂觀鎖),同時為了防止哈希沖突嚴重時退化成鏈表(沖突時會在該位置生成一個鏈表,哈希值相同的對象就鏈在一起),會在鏈表長度達到閾值(8)后轉換成紅黑樹(比起鏈表,樹的查詢效率更穩定)。
示例
import java.util.concurrent.*; public class ConcurrentHashMapExample { public static void mAIn(String[] args) { // Creating a ConcurrentHashMap ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>(); // Adding elements to the ConcurrentHashMap map.put("Key1", "Value1"); map.put("Key2", "Value2"); map.put("Key3", "Value3"); // Printing the ConcurrentHashMap System.out.println("ConcurrentHashMap: " + map); } }
2.CopyOnWriteArrayList 并發版 ArrayList
并發版 ArrayList,底層結構也是數組,和 ArrayList 不同之處在于:當新增和刪除元素時會創建一個新的數組,在新的數組中增加或者排除指定對象,最后用新增數組替換原來的數組。
CopyOnWriteArrayList
的主要特性是,每當列表修改時,例如添加或刪除元素,它都會創建列表的一個新副本。原始列表和新副本都可以進行并發讀取,這樣就可以在不鎖定整個列表的情況下進行并發讀取。這種方法在讀取操作遠多于寫入操作的場景中非常有用。
適用場景:由于讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用于讀多寫少的場景。
局限:由于讀的時候不會加鎖(讀的效率高,就和普通 ArrayList 一樣),讀取的當前副本,因此可能讀取到臟數據。如果介意,建議不用。
看看源碼感受下:
示例
import java.util.concurrent.*; public class CopyOnWriteArrayListExample { public static void main(String[] args) { // 創建一個 CopyOnWriteArrayList CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>(); // 向 CopyOnWriteArrayList 添加元素 list.add("Element1"); list.add("Element2"); list.add("Element3"); // 打印 CopyOnWriteArrayList System.out.println("CopyOnWriteArrayList: " + list); } }
3.CopyOnWriteArraySet 并發 Set
基于 CopyOnWriteArrayList 實現(內含一個 CopyOnWriteArrayList 成員變量),也就是說底層是一個數組,意味著每次 add 都要遍歷整個集合才能知道是否存在,不存在時需要插入(加鎖)。
CopyOnWriteArraySet
的工作原理與 CopyOnWriteArrayList
類似。每當發生修改操作(如添加或刪除元素)時,它都會創建集合的一個新副本。原始集合和新副本都可以進行并發讀取,這樣就可以在不鎖定整個集合的情況下進行并發讀取。這種方法在讀取操作遠多于寫入操作的場景中非常有用。
適用場景:在 CopyOnWriteArrayList 適用場景下加一個,集合別太大(全部遍歷傷不起)。
示例
import java.util.concurrent.*; public class CopyOnWriteArraySetExample { public static void main(String[] args) { // 創建一個 CopyOnWriteArraySet CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<String>(); // 向 CopyOnWriteArraySet 添加元素 set.add("Element1"); set.add("Element2"); set.add("Element3"); // 打印 CopyOnWriteArraySet System.out.println("CopyOnWriteArraySet: " + set); } }
4.ConcurrentLinkedQueue 并發隊列 (基于鏈表)
基于鏈表實現的并發隊列,使用樂觀鎖 (CAS) 保證線程安全。因為數據結構是鏈表,所以理論上是沒有隊列大小限制的,也就是說添加數據一定能成功。
ConcurrentLinkedQueue
是 Java 并發包的一部分,它是基于鏈接節點的無界線程安全隊列。它按照 FIFO(先進先出)的原則對元素進行排序。
ConcurrentLinkedQueue
的主要優點是它允許完全并發的插入,并且使用了一種高效的“wait-free”算法。
示例
import java.util.concurrent.*; public class ConcurrentLinkedQueueExample { public static void main(String[] args) { // 創建一個 ConcurrentLinkedQueue ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); // 向 ConcurrentLinkedQueue 添加元素 queue.add("Element1"); queue.add("Element2"); queue.add("Element3"); // 打印 ConcurrentLinkedQueue System.out.println("ConcurrentLinkedQueue: " + queue); } }
5.ConcurrentLinkedDeque 并發隊列 (基于雙向鏈表)
基于雙向鏈表實現的并發隊列,可以分別對頭尾進行操作,因此除了先進先出 (FIFO),也可以先進后出(FILO),當然先進后出的話應該叫它棧了。
ConcurrentLinkedDeque
是 Java 并發包的一部分,它是一個基于鏈接節點的無界并發雙端隊列。在 ConcurrentLinkedDeque
中,添加、刪除等操作可以在隊列的兩端進行,使其具有更高的并發性。
示例
import java.util.concurrent.*; public class ConcurrentLinkedDequeExample { public static void main(String[] args) { // 創建一個 ConcurrentLinkedDeque ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<String>(); // 向 ConcurrentLinkedDeque 添加元素 deque.add("Element1"); deque.addFirst("Element2"); deque.addLast("Element3"); // 打印 ConcurrentLinkedDeque System.out.println("ConcurrentLinkedDeque: " + deque); } }
6.ConcurrentSkipListMap 基于跳表的并發 Map
ConcurrentSkipListMap
是 Java 并發包的一部分,它是一個線程安全的排序映射表。它使用跳表的數據結構來保證元素的有序性和并發性。
跳表是一種可以進行二分查找的有序鏈表。ConcurrentSkipListMap
提供了預期的平均 log(n) 時間成本來執行 containsKey
,get
,put
和 remove
操作,并且它的并發性通常優于基于樹的算法。
SkipList 即跳表,跳表是一種空間換時間的數據結構,通過冗余數據,將鏈表一層一層索引,達到類似二分查找的效果
示例
import java.util.concurrent.*; public class ConcurrentSkipListMapExample { public static void main(String[] args) { // 創建一個 ConcurrentSkipListMap ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<String, String>(); // 向 ConcurrentSkipListMap 添加元素 map.put("Key1", "Value1"); map.put("Key2", "Value2"); map.put("Key3", "Value3"); // 打印 ConcurrentSkipListMap System.out.println("ConcurrentSkipListMap: " + map); } }
7.ConcurrentSkipListSet 基于跳表的并發 Set
類似 HashSet 和 HashMap 的關系,ConcurrentSkipListSet 里面就是一個 ConcurrentSkipListMap,
ConcurrentSkipListSet
是 Java 并發包的一部分,它是一個線程安全的排序集合。它使用跳表的數據結構來保證元素的有序性和并發性。
跳表是一種可以進行二分查找的有序鏈表。ConcurrentSkipListSet
提供了預期的平均 log(n) 時間成本來執行 contains
,add
和 remove
操作,并且它的并發性通常優于基于樹的算法。
示例
import java.util.concurrent.*; public class ConcurrentSkipListSetExample { public static void main(String[] args) { // 創建一個 ConcurrentSkipListSet ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<String>(); // 向 ConcurrentSkipListSet 添加元素 set.add("Element1"); set.add("Element2"); set.add("Element3"); // 打印 ConcurrentSkipListSet System.out.println("ConcurrentSkipListSet: " + set); } }
8.ArrayBlockingQueue 阻塞隊列 (基于數組)
ArrayBlockingQueue
是 Java 并發包的一部分,它是一個基于數組的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。
ArrayBlockingQueue
在嘗試插入元素到已滿隊列或從空隊列中移除元素時,會導致線程阻塞,直到有空間或元素可用。
基于數組實現的可阻塞隊列,構造時必須制定數組大小,往里面放東西時如果數組滿了便會阻塞直到有位置(也支持直接返回和超時等待),通過一個鎖 ReentrantLock 保證線程安全。
乍一看會有點疑惑,讀和寫都是同一個鎖,那要是空的時候正好一個讀線程來了不會一直阻塞嗎?
答案就在 notEmpty、notFull 里,這兩個出自 lock 的小東西讓鎖有了類似 synchronized + wait + notify 的功能。傳送門 → 終于搞懂了 sleep/wait/notify/notifyAll
示例
import java.util.concurrent.*; public class ArrayBlockingQueueExample { public static void main(String[] args) { // 創建一個 ArrayBlockingQueue ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 向 ArrayBlockingQueue 添加元素 try { queue.put("Element1"); queue.put("Element2"); queue.put("Element3"); } catch (InterruptedException e) { e.printStackTrace(); } // 打印 ArrayBlockingQueue System.out.println("ArrayBlockingQueue: " + queue); } }
9.LinkedBlockingQueue 阻塞隊列 (基于鏈表)
LinkedBlockingQueue
是 Java 并發包的一部分,它是一個基于鏈表的可選有界阻塞隊列。此隊列按照 FIFO(先進先出)的原則對元素進行排序。
LinkedBlockingQueue
在嘗試插入元素到已滿隊列或從空隊列中移除元素時,會導致線程阻塞,直到有空間或元素可用。
基于鏈表實現的阻塞隊列,想比與不阻塞的 ConcurrentLinkedQueue,它多了一個容量限制,如果不設置默認為 int 最大值。
示例
import java.util.concurrent.*; public class LinkedBlockingQueueExample { public static void main(String[] args) { // 創建一個 LinkedBlockingQueue LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(3); // 向 LinkedBlockingQueue 添加元素 try { queue.put("Element1"); queue.put("Element2"); queue.put("Element3"); } catch (InterruptedException e) { e.printStackTrace(); } // 打印 LinkedBlockingQueue System.out.println("LinkedBlockingQueue: " + queue); } }
10.LinkedBlockingDeque 阻塞隊列 (基于雙向鏈表)
LinkedBlockingDeque
是 Java 并發包的一部分,它是一個基于鏈表的可選有界阻塞雙端隊列。此隊列按照 FIFO(先進先出)的原則對元素進行排序。
LinkedBlockingDeque
在嘗試插入元素到已滿隊列或從空隊列中移除元素時,會導致線程阻塞,直到有空間或元素可用。雙端隊列的優勢在于可以從兩端插入或移除元素。
類似 LinkedBlockingQueue,但提供了雙向鏈表特有的操作。
示例
import java.util.concurrent.*; public class LinkedBlockingDequeExample { public static void main(String[] args) { // 創建一個 LinkedBlockingDeque LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>(3); // 向 LinkedBlockingDeque 添加元素 try { deque.putFirst("Element1"); deque.putLast("Element2"); deque.putFirst("Element3"); } catch (InterruptedException e) { e.printStackTrace(); } // 打印 LinkedBlockingDeque System.out.println("LinkedBlockingDeque: " + deque); } }
11.PriorityBlockingQueue 線程安全的優先隊列
PriorityBlockingQueue
是 Java 并發包的一部分,它是一個無界的并發隊列。它使用了和類 java.util.PriorityQueue
一樣的排序規則,并且能夠確保在并發環境下的線程安全。
PriorityBlockingQueue
中的元素按照自然順序或者由比較器提供的順序進行排序。隊列不允許使用 null
元素。
構造時可以傳入一個比較器,可以看做放進去的元素會被排序,然后讀取的時候按順序消費。某些低優先級的元素可能長期無法被消費,因為不斷有更高優先級的元素進來。
示例
import java.util.concurrent.*; public class PriorityBlockingQueueExample { public static void main(String[] args) { // 創建一個 PriorityBlockingQueue PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<String>(); // 向 PriorityBlockingQueue 添加元素 queue.add("Element1"); queue.add("Element2"); queue.add("Element3"); // 打印 PriorityBlockingQueue System.out.println("PriorityBlockingQueue: " + queue); } }
12.SynchronousQueue 數據同步交換的隊列
SynchronousQueue
是 Java 并發包的一部分,它是一個不存儲元素的阻塞隊列。每一個 put
操作必須等待一個 take
操作,否則不能繼續添加元素,反之亦然。
這種特性使 SynchronousQueue
成為線程之間傳遞數據的好工具。它可以看作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。
一個虛假的隊列,因為它實際上沒有真正用于存儲元素的空間,每個插入操作都必須有對應的取出操作,沒取出時無法繼續放入。
示例
import java.util.concurrent.SynchronousQueue; public class Main { public static void main(String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(); new Thread(()->{ try{ for(int i=0;;i++){ System.out.println("放入:" + i); queue.put(i); } }catch (InterruptedException e){ e.printStackTrace(); } }).start(); new Thread(()->{ try{ while(true){ System.out.println("取出:" + queue.take()); Thread.sleep((long)(Math.random()*2000)); } }catch (InterruptedException e){ e.printStackTrace(); } }).start(); } }
運行結果:
取出:0 放入:0 取出:1 放入:1 放入:2 取出:2 取出:3 放入:3 取出:4 放入:4 ... ...
可以看到,寫入的線程沒有任何 sleep,可以說是全力往隊列放東西,而讀取的線程又很不積極,讀一個又 sleep 一會。輸出的結果卻是讀寫操作成對出現。
JAVA 中一個使用場景就是 Executors.newCachedThreadPool(),創建一個緩存線程池。
13.LinkedTransferQueue 基于鏈表的數據交換隊列
LinkedTransferQueue
是 Java 并發包的一部分,它是一個由鏈表結構組成的無界轉移阻塞隊列。隊列按照 FIFO(先進先出)的原則對元素進行排序。
LinkedTransferQueue
的一個特性是,它可以嘗試將元素直接轉移給消費者,如果沒有等待的消費者,元素就會被添加到隊列的尾部,等待消費者來獲取。
實現了接口 TransferQueue,通過 transfer 方法放入元素時,如果發現有線程在阻塞在取元素,會直接把這個元素給等待線程。如果沒有人等著消費,那么會把這個元素放到隊列尾部,并且此方法阻塞直到有人讀取這個元素。和 SynchronousQueue 有點像,但比它更強大。
示例
import java.util.concurrent.*; public class LinkedTransferQueueExample { public static void main(String[] args) { // 創建一個 LinkedTransferQueue LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>(); // 啟動一個新線程來從 LinkedTransferQueue 取出元素 new Thread(() -> { try { System.out.println("Taken: " + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 向 LinkedTransferQueue 添加一個元素 try { queue.transfer("Element"); } catch (InterruptedException e) { e.printStackTrace(); } } }
14.DelayQueue 延時隊列
DelayQueue
是 Java 并發包的一部分,它是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。此隊列的頭部是延遲期滿后保存時間最長的元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且 poll
將返回 null
。
元素在 DelayQueue
中的順序是按照其到期時間的先后順序進行排序的,越早到期的元素越排在隊列前面。延遲隊列常用于實現定時任務功能。
可以使放入隊列的元素在指定的延時后才被消費者取出,元素需要實現 Delayed 接口。
示例
import java.util.concurrent.*; public class DelayQueueExample { public static void main(String[] args) { // 創建一個 DelayQueue DelayQueue<DelayedElement> queue = new DelayQueue<DelayedElement>(); // 向 DelayQueue 添加一個元素,延遲 3 秒 queue.put(new DelayedElement(3000, "Element")); // 從 DelayQueue 獲取元素 try { DelayedElement element = queue.take(); System.out.println("Taken: " + element); } catch (InterruptedException e) { e.printStackTrace(); } } } class DelayedElement implements Delayed { private long delayTime; // 延遲時間 private long expire; // 到期時間 private String element; // 元素數據 public DelayedElement(long delay, String element) { this.delayTime = delay; this.element = element; this.expire = System.currentTimeMillis() + delay; } @Override public long getDelay(TimeUnit unit) { return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return element; } }
總結
從上面的介紹總總結有以下幾種容器類
-
ConcurrentHashMap:并發版 HashMap
-
CopyOnWriteArrayList:并發版 ArrayList
-
CopyOnWriteArraySet:并發 Set
-
ConcurrentLinkedQueue:并發隊列 (基于鏈表)
-
ConcurrentLinkedDeque:并發隊列 (基于雙向鏈表)
-
ConcurrentSkipListMap:基于跳表的并發 Map
-
ConcurrentSkipListSet:基于跳表的并發 Set
-
ArrayBlockingQueue:阻塞隊列 (基于數組)
-
LinkedBlockingQueue:阻塞隊列 (基于鏈表)
-
LinkedBlockingDeque:阻塞隊列 (基于雙向鏈表)
-
PriorityBlockingQueue:線程安全的優先隊列
-
SynchronousQueue:讀寫成對的隊列
-
LinkedTransferQueue:基于鏈表的數據交換隊列
-
DelayQueue:延時隊列
Java 并發容器為處理多線程環境下的數據訪問和修改提供了強大的工具。
通過了解和學習這些并發容器,我們可以更好地理解并發編程,更有效地處理并發問題。
無論你是正在學習 Java,還是已經在使用 Java 進行開發,我都強烈建議你深入了解這些并發容器,它們將在你的并發編程之路上起到重要的作用。