JAVA從版本5開始,在
java.util.concurrent.locks包內給我們提供了除了synchronized關鍵字以外的幾個新的鎖功能的實現,ReentrantLock就是其中的一個。但是這并不意味著我們可以拋棄synchronized關鍵字了。
在這些鎖實現之前,如果我們想實現鎖的功能,只能通過synchronized關鍵字,例子如下:
private static volatile int value;
public static void main() {
Runnable run = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
increaseBySync();
}
}
};
Thread t1 = new Thread(run);
Thread t2 = new Thread(run);
t1.start();
t2.start();
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println(value);
}
private static synchronized int increaseBySync() {
return value++;
}
有了ReentrantLock之后,我們可以這樣寫:
private static volatile int value;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
testIncreaseWithLock();
}
public static void main() {
Runnable run = new Runnable() {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 1000; i++) {
value++;
}
} finally {
lock.unlock();
}
}
};
Thread t1 = new Thread(run);
Thread t2 = new Thread(run);
t1.start();
t2.start();
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println(value);
}
以上兩段代碼都可以實現value值同步自增1,并且value都能得到正確的值2000。下面我們就來分析下ReentrantLock有哪些功能以及它底層的原理。
可重入鎖
從名字我們就可以看出ReentrantLock是可重入鎖??芍厝腈i是指當某個線程已經獲得了該鎖時,再次調用lock()方法可以再次立即獲得該鎖。
舉個例子,當某個線程在執行methodA()時,假設已經獲得了鎖,這是當它執行到methodB()時可以立即獲得methodB里面的鎖,因為兩個方法是調用的同一把鎖。
private static Lock lock = new ReentrantLock();
public static void methodA(){
try{
lock.lock();
//dosomething
methodB();
}finally{
lock.unlock();
}
}
public static void methodB(){
try{
lock.lock();
//dosomthing
}finally{
lock.unlock();
}
}
synchronized也是可重入鎖,有興趣的同學可以自己寫個例子測試下。
公平鎖
通過源碼我們可以看到ReentrantLock支持公平鎖,并且默認是非公平鎖。
//ReentrantLock源碼
public ReentrantLock() {
sync = new NonfairSync();
}
//ReentrantLock源碼
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
下面是非公平鎖和公平鎖的lock()方法實現,從兩個lock()方法我們可以看到,它們的不同點是在調用非公平鎖的lock()方法時,當前線程會嘗試去獲取鎖,如果獲取失敗了則調用acquire(1)方法入隊列等待;而調用公平鎖的lock()方法當前線程會直接入隊列等待(acquire方法涉及到AQS下面會講到)。
//ReentrantLock源碼,非公平鎖
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//ReentrantLock源碼,公平鎖
final void lock() {
acquire(1);
}
而synchronized是一個非公平鎖。
超時機制
ReentrantLock還提供了超時機制,當調用tryLock()方法,當前線程如果獲取鎖失敗會立刻返回;而當調用帶參tryLock()方法是,當前線程如果在設置的timeout時間內未獲得鎖,也會立刻返回。而這些功能背后主要是依賴AQS實現的。
//ReentrantLock源碼
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//ReentrantLock源碼
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
synchronized沒有這個功能。
可被中斷
ReentrantLock有一個lockInterruptibly()方法,它會最終調用AQS的兩個方法:
AQS方法一中如果當前線程被中斷則拋出InterruptedException,否則嘗試去獲取鎖,獲取成功則返回,獲取失敗則調用aqs方法二doAcquireInterruptibly()。
AQS方法二中在for循環線程自旋中也會判斷當前線程是否被標記為中斷,如果是也會拋出InterruptedException。
doAcquireInterruptibly()的細節我們在下面講解AQS的時候會重點介紹,它和doAcquire()方法很類似,唯一區別是會拋出InterruptedException。
//lock方法
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//aqs方法一
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//aqs方法二
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS(AbstractQueueSynchronizer)
AQS是一個基于隊列的同步器,它是一個抽象類,主要提供了多線程獲取鎖時候的排隊等待和激活機制,ReentrantLock內部有兩個基于AQS實現的子類,分別針對公平鎖和非公平鎖做了支持。
下面我們以公平鎖為例,講解下ReentrantLock是如何依賴AQS實現其功能的。獲得鎖涉及到的主要源代碼和解釋如下:
//AQS源碼,公平鎖的lock()方法會直接調用該方法
//這里當前如果獲取失敗會調用acquireQueued方法
//addWaiter方法主要是將當前線程加入AQS內部隊列的尾部
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//ReentrantLock中實現公平鎖的AQS子類的方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//c == 0表示當前AQS為初始狀態,可以嘗試獲取鎖, 如獲取成功則返回true
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 只有當前線程是已經獲取了該鎖的線程才能再次獲取鎖(可重入鎖)并返回true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//返回false獲取失敗
return false;
}
//AQS源碼
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//這里如果當前線程對應的隊列里的Node的前置Node是head,則嘗試獲取鎖,并成功返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire方法標記當前線程Node的前置Node的waitStatus為SIGNAL,意思是當你從隊列移除時記得要喚醒我哦
//parkAndCheckInterrupt方法會讓當前線程掛起,停止自旋,免得白白浪費CPU資源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
假設現在有線程A、B、C同時去獲取同一把鎖,大概的流程是這樣的,這里假設線程A獲的鎖并成功返回,線程B和C則依次調用上面的方法,進入AQS的等待隊列并最終被掛起。
這時線程A做完自己該做的事情了,它在finally塊中調用了unlock方法,這時我們再看下相關的源代碼。
//AQS源碼,當前線程在釋放鎖的同時,會判斷它所在Node的waitStatus有沒有被它的后繼結點標記,如果被標記了,那就喚醒后繼結點對應的線程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//AQS源碼,主要關注最下面LockSupport.unpark(s.thread),喚醒后繼結點線程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or Apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
線程A釋放鎖時,會喚醒head結點的后繼結點也就是線程B,此時線程B就可以繼續for循環里面自旋并成功獲得鎖了。
unsafe相關
之前介紹AtomicInteger的時候提到Unsafe對象,AtomicInteger用到了Unsafe對象的CAS功能(底層是cpu提供的功能)。
ReentrantLock除了用到了CAS之外,還用到了Unsafe的pack和unpack兩個方法(LockSupport當中),除了性能更好之外,它可以精確的喚醒某一個線程。