JAVA并發(fā)工具類主要有CyclicBarrier、CountDownLatch、Semaphore和Exchanger,日常開發(fā)中經(jīng)常使用的是CountDownLatch和Semaphore。下面就簡單分析下這幾個并發(fā)工具類:
CyclicBarrier 內(nèi)存屏障
CyclicBarrier底層借助于一個count計數(shù)器和Lock/Condition實現(xiàn)內(nèi)存內(nèi)存屏障功能,在對count--時必須先獲取到lock,如果count不為0,則調(diào)用condition.wait進(jìn)行阻塞操作;直到當(dāng)count為0時,執(zhí)行barrierCommand(如果配置的話,執(zhí)行barrierCommand的線程是剛好將count減到0的那個線程),然后調(diào)用condition.signalAll喚醒所有等待的線程。
CyclicBarrier可用于多線程同步、多線程計算最后合并計算結(jié)果的場景,比如分片計算最后使用CyclicBarrier統(tǒng)計最后的結(jié)果等。
CyclicBarrier使用示例如下:
public static void main(String[] args) throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2,
() -> System.out.println(Thread.currentThread().getName() + ": all is ok"));
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + ": start wait");
barrier.await();
System.out.println(Thread.currentThread().getName() + ": start ok");
} catch (Exception e) {
e.printStackTrace();
}
};
Thread t1 = new Thread(task, "thread1");
Thread t2 = new Thread(task, "thread2");
t2.start();
t1.start();
t1.join();
t2.join();
}
CountDownLatch 計數(shù)器
CountDownLatch允許一個或多個線程等待其他線程完成操作。CountDownLatch底層借助于AQS來實現(xiàn)功能,初始化一個CountDownLatch(n)時,相當(dāng)于創(chuàng)建了一個state為n的AQS,當(dāng)調(diào)用countDown()時會對AQS進(jìn)行減一操作,如果state為0,則會對阻塞隊列中所有線程進(jìn)行喚醒操作。
CountDownLatch計數(shù)器必須大于等于0,等于0的時候調(diào)用await方法時不會阻塞當(dāng)前線程,注意CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內(nèi)部計數(shù)的值。一個線程調(diào)用coundDown方法hAppen-before,另一個線程調(diào)用await方法。
public static void main(String[] args) throws Exception {
CountDownLatch downLatch = new CountDownLatch(2);
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + ": start countDown");
downLatch.countDown();
System.out.println(Thread.currentThread().getName() + ": start ok");
} catch (Exception e) {
e.printStackTrace();
}
};
Thread t1 = new Thread(task, "thread1");
Thread t2 = new Thread(task, "thread2");
t1.start();
t2.start();
downLatch.await();
System.out.println("main wait ok");
t1.join();
t2.join();
}
Semaphore信號量
Semaphore用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,保證合理的使用公共資源。Semaphore可用作流量控制,特別是公共資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接。
Semaphore底層也是基于AQS,初始化Semaphore(n)相當(dāng)于初始化一個state為n的AQS,調(diào)用acquire()時會對進(jìn)行state - 1操作,如果結(jié)果大于0則CAS設(shè)置state為state-1,相當(dāng)于獲取到了信號量,否則進(jìn)行阻塞操作(調(diào)用tryAcquire則不會阻塞線程)。調(diào)用release會對state進(jìn)行++操作。
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
ExecutorService executor = Executors.newFixedThreadPool(10);
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " acquire before");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquire ok");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.execute(task);
}
Exchanger 線程間交換數(shù)據(jù)
Exchanger是一個用戶線程間交換數(shù)據(jù)的工具類,它提供了一個同步點,在這個同步點上,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange方法交換數(shù)據(jù),如果第一個線程先執(zhí)行exchange方法,他會一直等待第二個線程也執(zhí)行exchange方法,當(dāng)兩個線程都達(dá)到同步點時,這兩個線程交換數(shù)據(jù),將本線程產(chǎn)生的數(shù)據(jù)傳遞給對方。
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Runnable task = () -> {
try {
String result = exchanger.exchange(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + ": " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(task);
executor.execute(task);
}
Exchanger實現(xiàn)分析
Exchanger算法的核心是通過一個可交換數(shù)據(jù)的slot,以及一個可以帶有數(shù)據(jù)item的參與者,slot是Node類型,Node定義如下:
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
每一個參與者都帶有一個Participant,當(dāng)調(diào)用exchange時,如果slot為空,則將自己攜帶的數(shù)據(jù)CAS設(shè)置到slot上,然后park自己;如果slot不為空,則表示已經(jīng)有線程在slot里設(shè)置了數(shù)據(jù),則讀取Node.item字段,并將自己攜帶的數(shù)據(jù)設(shè)置到Node.match字段,然后喚醒之前設(shè)置數(shù)據(jù)的線程(之前阻塞的線程在喚醒后讀取Node.match字段返回),然后返回數(shù)據(jù)即可。
小結(jié)
了解了這些Java并發(fā)工具類,小伙伴們在日常開發(fā)中,都用到哪種或者哪幾種呢?
歡迎分享給其他小伙伴進(jìn)行交流,或者評論區(qū)留言一起討論哈 : )