在 JDK1.5 后,推出了幾個并發的工具類,位于 JUC(JAVA.util.concurrent)包下。
CountDownLatch
CountDownLatch 類是使一個線程等待其他線程各自執行完畢后再執行。
類似于現實中某個活動需要等到全部人齊了才可以開始。
實現原理:
- 基于 AQS 的共享模式。
從ReentrantLock的實現看AQS的原理及應用
- 這個類是一個同步計數器,主要用于線程間的控制。
- 當 CountDownLatch 的 count 計數 > 0 時,本線程的 await() 會造成阻塞,直到 count 變為 0,開始執行本線程。
package test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test1 {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2); // 計數器初始化為 2,要等兩個線程執行完畢
System.out.println("主線程開始執行");
ExecutorService es1 = Executors.newSingleThreadExecutor();
es1.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("子線程:" + Thread.currentThread().getName() + "執行");
}catch (InterruptedException e){
e.printStackTrace();
}
latch.countDown(); // 使計數器減一
}
});
ExecutorService es2 = Executors.newSingleThreadExecutor();
es2.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("子線程:" + Thread.currentThread().getName() + "執行");
latch.countDown();
}
});
System.out.println("等待兩個線程執行完畢");
try {
latch.await(); // 主線程掛起,等待兩個線程執行完
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("兩個子線程都執行完畢,繼續執行主線程");
}
}
主線程開始執行
等待兩個線程執行完畢
子線程:pool-2-thread-1執行
子線程:pool-1-thread-1執行
兩個子線程都執行完畢,繼續執行主線程
CyclicBarrier
與 CountDownLatch 功能一樣,不過它可以重復循環,而 CountDownLatch 只能執行一次。
實現原理:
- 基于 ReentrantLock 和 Condition
//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
//線程攔截器
private final Condition trip = lock.newCondition();
//每次攔截的線程數
private final int parties;
//換代前執行的任務
private final Runnable barrierCommand;
//表示柵欄的當前代
private Generation generation = new Generation();
//計數器
private int count;
//靜態內部類Generation
private static class Generation {
boolean broken = false;
- 上面貼出了 CyclicBarrier 所有的成員變量,可以看到 CyclicBarrier 內部使通過條件 trip 來對線程進行阻塞。
- 并且其內部維護了兩個 int 型變量 parites 和 count,parties 表示每次攔截的線程數,該值在構造時進行賦值。count 是內部計數器,他的初始值和 parties 相同,以后隨著每次 await 方法的調用而減一,直到減為零將喚醒主線程。
- CyclicBarrier 有一個靜態內部類 Generation,該類的對象代表柵欄的當前代,就像玩游戲時代表的本局游戲,利用它可以實現循環等待。
- barrierCommand 表示換代前執行的任務,當 count 減為零時表示本局游戲結束,需要轉到下一局。在轉到下一局游戲之前,利用它可以實現循環等待。
package test;
import java.util.concurrent.CyclicBarrier;
public class Test2 {
static class TaskThread extends Thread{
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier){
this.barrier = barrier;
}
@Override
public void run(){
try{
Thread.sleep(100);
System.out.println(getName() + "到達柵欄 A");
barrier.await(); // 等待所有線程都執行到這,才執行主線程
System.out.println(getName() + "沖破柵欄 A"); // 主線程完成后繼續執行
Thread.sleep(2000);
System.out.println(getName() + "到達柵欄 B");
barrier.await();
System.out.println(getName() + "沖破柵欄 B");
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "完成任務");
}
});
for (int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
Semaphore
該類用于控制信號量的個數,可以控制同時訪問資源的線程個數,并提供了同步機制。例如,實現一個文件允許的并發訪問數。
Semaphore 的主要方法:
- acquire():從此信號量中獲取一個許可,若已超過許可量,則阻塞此請求線程。
- release():釋放一個許可,將其返回給信號量。
- availablePermits():返回此信號量中當前可用的許可數。
- hasQueuedThreads():查詢是否有線程正在等待獲取。
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3); // 創建 Semaphore 信號量,初始化許可大小為3
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();
}
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
sp.acquire(); // 請求獲取許可,如果有可獲取許可,則繼續往下指向,許可數減一。
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() +
"進入,當前已有" + (3 - sp.availablePermits()) + "個并發") ;
try{
Thread.sleep((long)(Math.random() * 10000));
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() + "即將離開");
sp.release(); // 釋放許可證,許可數+1
}
};
service.execute(runnable);
}
}
}
Exchanger
這個類用于交換數據,只能用于兩個線程。當一個線程運行到 exchange() 方法時會阻塞,另一個線程運行到 exchange() 時,兩者交換數據,然后執行后面的程序。
package test;
import java.util.concurrent.Exchanger;
public class Test3 {
static class Producer extends Thread{ // 生產者線程
private Exchanger<Integer> exchanger; // 交換標志
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger){
super("Producer-" + name);
this.exchanger = exchanger;
}
@Override
public void run(){
for (int i = 1; i < 5; i++) {
try {
Thread.sleep(1000);
data = i;
System.out.println(getName() + "交換前:" + data);
data = exchanger.exchange(data); // 將此 data 與 消費者的 data 進行交換
System.out.println(getName() + "交換后:" + data);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread{ // 消費者線程
private Exchanger<Integer> exchanger; // 交換標志
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger){
super("Consumer-" + name);
this.exchanger = exchanger;
}
@Override
public void run(){
while(true){
data = 0;
System.out.println(getName() + "交換前:" + data);
try{
data = exchanger.exchange(data); // 將此 data 與生產者的 data 進行交換,因為先執行到這,會阻塞知道生產者執行到交換
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(getName() + "交換后:" + data);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
Thread.sleep(7000);
}
}