前提概要
之前的文章中會涉及到了相關AQS的原理和相關源碼的分析,所謂實踐是檢驗真理的唯一標準!接下來就讓我們活化一下AQS技術,主要針對于自己動手實現(xiàn)一個AQS同步器。
定義MyLock實現(xiàn)Lock
Doug Lea大神在JDK1.5編寫了一個Lock接口,里面定義了實現(xiàn)一個鎖的基本方法,我們只需編寫一個MyLock類實現(xiàn)這個接口就好。
class MyLock implements Lock {
/**
* 加鎖。如果不成功則進入等待隊列
*/
@Override
public void lock() {}
/**
* 加鎖(可被interrupt)
*/
@Override
public void lockInterruptibly() throws InterruptedException {}
/**
* 嘗試加鎖
*/
@Override
public boolean tryLock() {}
/**
* 加鎖 帶超時的
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {}
/**
* 釋放鎖
*/
@Override
public void unlock() {}
/**
* 返回一個條件變量(不在本案例談論)
*/
@Override
public Condition newCondition() {}
}
復制代碼
定義好MyLock后,接下來就是實現(xiàn)各個方法的邏輯,達到真正的用于線程間sync互斥的需求。
自定義一個MySync繼承自AQS
接下來我們需要自定義一個繼承自AQS的MySync。實現(xiàn)自定義的MySync前,先了解AQS內(nèi)部的一些基本概念。在AQS中主要的一些成員屬性如下:
- state:用于標記資源狀態(tài),如果為0表示資源沒有被占用,可以加鎖成功。如果大于0表示資源已經(jīng)被占用,然后根據(jù)自己的定義去實現(xiàn)是否允許對共享資源進行操作。 比如:ReentrantLock的實現(xiàn)方式是當state大于0,那么表示已經(jīng)有線程獲得鎖了,我們都知道ReentrantLock是可重入的,其原理就是當有線程次進入同一個lock標記的臨界區(qū)時。先判斷這個線程是否是獲得鎖的那個線程,如果是,state會+1,此時state會等于2。 當unlock時,會一層一層地減1,直到state等于0則表示完全釋放鎖成功。
- head、tail:用于存放獲得鎖失敗的線程。在AQS中,每一個線程會被封裝成一個Node節(jié)點,這些節(jié)點如果獲得鎖資源失敗會鏈在head、tail中,成為一個雙向鏈表結(jié)構。
- exclusiveOwnerThread:用于存放當前獲得鎖的線程,正如在state說明的那樣。ReentrantLock判斷可重入的條件就是用這個exclusiveOwnerThread線程跟申請獲得鎖的線程做比較,如果是同一個線程,則state+1,并重入加鎖成功。
知道這些概念后我們就可以自定義一個AQS:
public final class MySync extends AbstractQueuedSynchronizer {
/**
* 嘗試加鎖
*/
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 修改state狀態(tài)成功后設置當前線程為占有鎖資源線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 釋放鎖
*/
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
// state有volatile修飾,為了保證解鎖后其他的一些變量對其他線程可見,把setExclusiveOwnerThread(null)放到上面 hAppens-before中定義的 volatile規(guī)則
setState(0);
return true;
}
/**
* 判斷是否是獨占鎖
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
復制代碼
將MySync組合進MyLock
最后一步就是將第一步中的所有方法邏輯完成
class MyLock implements Lock {
// 組合自定義sync器
private MySync sync = new MySync();
/**
* 加鎖。如果不成功則進入等待隊列
*/
public void lock() {
sync.acquire(1);
}
/**
* 加鎖(可被interrupt)
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 嘗試加鎖
*/
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 加鎖 帶超時的
*/
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toMillis(time));
}
/**
* 釋放鎖
*/
public void unlock() {
sync.release(0);
}
/**
* 返回一個條件變量(不在本案例談論)
*/
@Override
public Condition newCondition() {
return null;
}
}
復制代碼
完成整個MyLock的邏輯后,發(fā)現(xiàn)在lock()、unlock()中調(diào)用的自定義sync的方法tryAcquire()和tryRelease()方法。我們就以在lock()方法中調(diào)用acquire()方法說明模板設計模式在AQS中的應用。
點進.acquire()方法后,發(fā)現(xiàn)該該方法是來自
AbstractQueuedSynchronizer中:
- 在這里面可以看到tryAcquire方法,繼續(xù)點進去看看tryAcquire(),發(fā)現(xiàn)該方法是一個必須被重寫的方法,否則拋出一個運行時異常。
- 模板方法設計模式在這里得以體現(xiàn),再回到我們第二部中自定義的MySync中,就是重寫了AQS中的tryAcquire()方法。
因此整個自定義加鎖的流程如下:
- 調(diào)用MyLock的lock(),lock()方法調(diào)用AQS的acquire()方法
- 在acquire()方法中調(diào)用了tryAcquire()方法進行加鎖
- 而tryAcquire()方法在AQS中是一個必須讓子類自定義重寫的方法,否則會拋出一個異常
- 因此調(diào)用tryAcquire()時實際上是調(diào)用了我們自定義的MySync類中tryAcquire()方法
總結(jié)
AQS作為JAVA并發(fā)體系下的關鍵類,在各種并發(fā)工具中都有它的身影,如ReentrantLock、Semaphore等。這些并發(fā)工具用于控制sync互斥的手段都是采用AQS,外加Cas機制。AQS采用了模板方法設計模式讓子類們自定義sync互斥的條件,比如本案例中MySync類重寫了tryAcquire方法。
下面實現(xiàn)一個自定義的sync:
public class SelfSynchronizer {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public boolean unLock() {
return sync.release(1);
}
static class Sync extends AbstractQueuedSynchronizer {
//是否處于占用狀態(tài)
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 獲取sync資源
* @param acquires
* @return
*/
@Override
public boolean tryAcquire(int acquires) {
if(compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//這里沒有考慮可重入鎖
/*else if (Thread.currentThread() == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}*/
return false;
}
/**
* 釋放sync資源
* @param releases
* @return
*/
@Override
protected boolean tryRelease(int releases) {
int c = getState() - releases;
boolean free = false;
if (c == 0) {
free = true;
}
setState(c);
return free;
}
}
}
復制代碼
ReentrantLock源碼和上面自定義的sync很相似,測試下該sync,i++在多線程下執(zhí)行情況:
public class TestSelfSynchronizer {
private static int a = 0;
private static int b = 0;
private static SelfSynchronizer selfSynchronizer = new SelfSynchronizer();
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
private static ExecutorService ec = Executors.newFixedThreadPool(20);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20 ; i++) {
executor.submit(new Task());
}
for (int j = 0; j < 20 ; j++) {
ec.submit(new TaskSync());
}
Thread.sleep(10000);
System.out.println("a的值:"+ a);
System.out.println("b的值" + b);
executor.shutdown();
ec.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
for(int i=0;i<10000;i++) {
a++;
}
}
}
static class TaskSync implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
//使用sync器加鎖
selfSynchronizer.lock();
b++;
selfSynchronizer.unLock();
}
}
}
}
復制代碼
開啟兩個線程池,對int型變量自增10000次,如果不加sync器,最后值小于200000,使用了自定義sync器則最后值正常等于200000,這是因為每次自增操作加鎖
作者:李浩宇Alex
鏈接:
https://juejin.cn/post/6989937347429302280
來源:掘金
著作權歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權,非商業(yè)轉(zhuǎn)載請注明出處。