你是否因為寫出死鎖導致半夜加班,扣績效?你是否為小白程序員,還沒有接觸過并發編程不知道什么死鎖,你是否希望通過并發編程這塊突破自己的瓶頸,在新的一年挑戰高薪?那么JAVA并發編程中的死鎖是你避不開的。
在通過redis或者zookeeper實現分布式鎖時也可能出現死鎖,本篇文章從Java線程入手,解密以下幾點:
- 什么是死鎖,死鎖如何產生
- 通過有趣的案例實現死鎖,并分析原因
- 分析死鎖產生的四個必要條件,并且解決死鎖
- 通過Java自帶工具檢測和定位死鎖位置
- 通過銀行家算法,規避死鎖問題
什么是死鎖
死鎖是進程死鎖的簡稱,是由Dijkstra于1965年研究銀行家算法時首先提出來的。它是計算機操作系統乃至并發程序設計中最難處理的問題之一。實際上,死鎖問題不僅在計算機系統中存在,在我們日常生活中它也廣泛存在。
我們來看一個死鎖例子:
公司需要有工作經驗的員工,而剛畢業的小伙伴需要工作來獲得工作經驗,這樣企業和應屆生之間就產生了死鎖現象
這樣的例子還有很多,比如:兩輛車過橋
電影中的經典情節:我要的貨呢,你帶錢沒有,最后一手交錢一手交貨
所謂的死鎖其實是一種現象,就是兩個或兩個以上線程的多線程情況下,多個線程同時被阻塞,它們中的一個或全部都在等待某一鎖資源的釋放,由于線程被無期限的阻塞,因此程序不會繼續執行,表現為卡住不動。
如:線程1和線程2的運行都需要A資源和B資源,此時線程1獲取了A資源,線程2獲取到了B鎖,此時線程1獲取不到B鎖和線程2獲取不到A鎖,導致兩個線程彼此僵持!
多把鎖場景
之前文章中的案例都是使用一把鎖,死鎖是線程需要多把鎖才會出現,那么什么場景下需要多把鎖呢?
案例
家中住著張三和翠花夫妻二人,家庭條件一般,只有一個廚房,希望實現翠花做飯和張三洗菜互不相干
一把鎖解決
分析:
- 定義一個廚房類,兩個功能,分別為洗菜和做飯【煮粥不是炒菜】
- 定義一把鎖,直接將廚房鎖上
- 假設洗菜需要1秒,做飯需要2秒
- 洗菜和做飯時使用唯一的一把廚房鎖,將整個廚房鎖上,實現互不打擾
廚房類:
package com.tianzhen.thread;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class KitchenRoom {
// 鎖對象
public Object lock = new Object();
// 洗菜
public void washing() {
// 鎖住房間
synchronized (lock) {
// 輸出開始時間 + 操作
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss SSS")) + ":洗菜");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// 做飯【煮粥】
public void cook() {
// 鎖上房間
synchronized (lock) {
// 輸出開始時間 + 操作
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss SSS")) + ":做飯");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
運行結果:
使用一把鎖的時候性能較低,因為鎖的范圍太大了,直接將廚房鎖住,只需將房間內的每一個功能單獨鎖起來即可,比如:單獨將洗菜,做飯,使用冰箱鎖住,不能多人同時使用,應該將鎖細化,這樣同一個房間就可以同時做很多工作,廚房的利用率就會上來,洗菜和做飯可以同步進行,這樣是不就可以早點吃上美味了呢!
廚房改造:
package com.tianzhen.thread;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class KitchenRoom {
// 洗菜鎖
public Object washLock = new Object();
// 做飯鎖
public Object cookLock = new Object();
// 洗菜
public void washing() {
// 使用洗菜鎖
synchronized (washLock) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss SSS")) + ":洗菜");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// 做飯
public void cook() {
// 使用做飯鎖
synchronized (cookLock) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss SSS")) + ":做飯");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
運行結果:
發現做飯【煮粥】和洗菜是同時開始的,通過細化鎖,可以提升程序性能,必須要保障兩個操作沒有關聯性,比如煮粥不需要等菜洗好,如果是炒菜,就需要等待菜洗好才可以進行。
鎖細粒度化的好處是:提高程序等性能,弊端在于:如果一個線程同時需要多把鎖,就可能產生死鎖
死鎖現象
以上邊的企業和面試者為例演示死鎖,企業招工需要有工作經驗的程序員,但是添甄剛畢業,沒有工作經驗,需要有工作才能獲取工作經驗,這樣就導致企業招不到人,面試者找不到工作的尷尬境地!
分析
- 這里有兩個條件,一個是工作經驗,一個是工作
- 企業先驗證面試者是否有工作經驗,才會給工作機會
- 面試者需要先獲取工作,才能還有工作經驗
- 兩者如果僵持不下,就會產生死鎖
代碼實現
package com.tianzhen.thread;
public class Deadhread {
// 工作鎖
private static Object work = new Object();
// 工作經驗鎖
private static Object workExperience = new Object();
public static void main(String[] args) {
// 企業線程
new Thread(() -> {
// 先鎖定工作經驗
synchronized (workExperience) {
System.out.println(Thread.currentThread().getName() + "我們需要有工作經驗的");
// 給工作機會
synchronized (work) {
System.out.println(Thread.currentThread().getName() + "恭喜你通過面試加入我們");
}
}
},"企業線程:").start();
// 員工線程
new Thread(() -> {
// 先獲取工作機會
synchronized (work) {
System.out.println(Thread.currentThread().getName() + "我需要工作才能有工作經驗");
// 有工作經驗
synchronized (workExperience) {
System.out.println(Thread.currentThread().getName() + "通過面試獲得了工作經驗");
}
}
},"面試者線程:").start();
}
}
運行結果:發現程序再企業和面試者各輸出一句之后卡死不動
原因
圖解
簡單的說就是:我需要的東西你占著,你需要的東西我占著,而且我們都不會腦筋急轉彎,就傻傻的等著對方讓步,拿到自己需要的東西之后繼續玩,但是大家都這么想那就誰也別玩了。
死鎖產生的四個必要條件
- 互斥使用:即當資源被一個線程使用(占有)時,別的線程不能使用
- 不可搶占:資源請求者不能強制從資源占有者手中奪取資源,資源只能由資源占有者主動釋放
- 請求和保持:即當資源請求者在請求其他的資源的同時保持對原有資源的占有
- 循環等待:即存在一個等待隊列:企業線程占有workExperience鎖資源,面試者線程占有 work鎖 資源,面試者線程需要workExperience鎖資源,企業線程需要work鎖資源,彼此等待對方釋放資源。這樣就形成了一個等待環路
當上述四個條件都成立的時候,便形成死鎖。當然,死鎖的情況下如果打破上述任何一個條件,便可讓死鎖消失。
死鎖檢測
死鎖檢測其實非常簡單,這里介紹兩種方式監測死鎖,如果你有更好的辦法或工具記得在評論區分享哦!
方式1:命令檢測
- 死鎖就會導致程序卡死不動,它的特點就是占用內存比較多,首先找到占用內存多的Java進程
- 其次通過jps命令找到對應的java進程號
- 通過jstack 進程號得到進程信息
- 通過進程信息查看是否是死鎖,發生在什么地方
1、window下通過任務管理器查看進程內存占用情況,linux下通過 top 命令查看,這里以window為例
2、通過jps命令獲取該進程的進程號也就是PID
3、通過 jstack PID 查看進程信息
接下來的信息:在jstack輸出的信息中最后出現了死鎖提示,提示顯示在DeadThread.java文件的第39行和23行,那你去排查代碼就可以啦
方式2:通過jconsole工具
這個工具在查看JVM內存時也是可以使用的,它是JDK中攜帶官方提供的工具,無需下載第三方插件即可使用
1、打開 jconsole 工具,在命令行輸入jconsole即可開啟,箭頭右側就是該工具啟動頁
2、選擇對應的Java進程查看信息,雙擊選中PID為128的Java進程
3、選中線程,點擊下方檢查死鎖按鈕
4、死鎖檢測結果,也會將死鎖的信息展示出開【右側信息需要雙擊左側線程名才會展示出來】
如何避免死鎖
這里說的避免死鎖,其實是在生產環境中也就是項目上線運行不要出現死鎖,不然又要被喊過去加班了,上邊說了死鎖產生的四個條件,只要我們將這四個條件中的任意一個破壞就不會產生死鎖。
- 禁止一個線程同時持有多把鎖
- 具備相同的加鎖順序
- 設置鎖超時
- 死鎖檢測
方案1:具備相同加鎖順序
比如,企業和面試者的案例,調換兩者加鎖順序一致即可解決死鎖問題
package com.tianzhen.thread;
public class DeadThread {
// 工作鎖
private static Object work = new Object();
// 工作經驗鎖
private static Object workExperience = new Object();
public static void main(String[] args) {
// 企業線程
new Thread(() -> {
// 工作
synchronized (work) {
System.out.println(Thread.currentThread().getName() + "來吧!加入我們,有無經驗都可");
// 工作經驗
synchronized (workExperience) {
System.out.println(Thread.currentThread().getName() + "感謝你的加入,為我們注入新鮮血液");
}
}
},"企業線程:").start();
// 員工線程
new Thread(() -> {
// 先獲取工作機會
synchronized (work) {
System.out.println(Thread.currentThread().getName() + "我沒有工作經驗");
// 有工作經驗
synchronized (workExperience) {
System.out.println(Thread.currentThread().getName() + "通過工作獲得了工作經驗");
}
}
},"面試者線程:").start();
}
}
運行結果:
此時就不會出現死鎖,當企業線程運行占用work鎖,這是如果發生線程切換,面試者也是要獲取work鎖,此時發現獲取不到,就會進入阻塞,CPU放棄執行轉而執行企業線程,此時企業線程獲取workExperience鎖,因為加鎖順序相同,此鎖必然沒有被比別的線程占用可以獲得,繼續執行,但是此時就無法實現交替執行,如果需要交替執行則需要使用線程通信實現,后邊會安排此部分內容
方案2:設置超時
因為 synchronized 不會自動釋放,無法設置超時時間,此方案需要通過Lock接口實現,改接口在Java并發編程合集的《Java線程安全問題和解決方案》一文中有詳細介紹
- 通過tryLock嘗試獲取鎖,如果獲取不到就立即失敗,不進入阻塞,你也可以調用tryLock(long time, TimeUnit unit)方法,設置獲取所得超時時間,如果指定的時間沒有獲取到則繼續運行
- 在finally中記得通過unlock方法釋放鎖,如果不釋放鎖,就會一直持有,陷入死鎖
package com.tianzhen.thread;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DeadThread {
// 工作鎖
private static Lock work = new ReentrantLock();
// 工作經驗鎖
private static Lock workExperience = new ReentrantLock();
public static void main(String[] args) {
// 企業線程
new Thread(() -> {
// 工作
if (workExperience.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + "沒有工作經驗,立即失敗!");
// 工作鎖
if (work.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + "技術不行,立即失敗!");
} finally {
work.unlock();
}
}
System.out.println(Thread.currentThread().getName() + "有工作經驗,通過面試,歡迎加入我們!");
} finally {
workExperience.unlock();
}
}
}, "企業線程:").start();
// 員工線程
new Thread(() -> {
if (work.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + "我需要工作,才能有工作經驗!");
// 工作鎖
if (workExperience.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + "沒有工作經驗,告辭告辭!");
} finally {
workExperience.unlock();
}
}
System.out.println(Thread.currentThread().getName() + "有工作經驗,獲取工作!");
} finally {
work.unlock();
}
}
}, "面試者線程:").start();
}
}
此方法一定要記得調用unlock釋放鎖,同樣可以解決死鎖問題,因為不會像 synchronized 一樣無腦等待,而是非常機智,如果拿不到就不要了,就好比追一個女孩子,追了三年還不行就放棄吧,而synchronized就是永不言棄,等到天荒地老,非常癡情。傾我半世陽光,許你天荒地老
你是不有更好的解決方案,趕緊掏出來嘮嘮吧!
銀行家算法中避免死鎖思維
銀行家算法是一種最有代表性的避免死鎖的算法。又被稱為資源分配拒絕法。 在避免死鎖方法中允許進程動態地申請資源,但系統在進行資源分配之前,應先計算此次分配資源的安全性,若此次分配不會導致系統進入不安全狀態,則將資源分配給線程,否則進程等待
銀行家算法中的數據結構
1、可利用資源向量Available 是個含有m個元素的數組,其中的每一個元素代表一類可利用的資源數目。如果Available[j]=K,則表示系統中現有Rj類資源K個。
2、最大需求矩陣Max 這是一個n×m的矩陣,它定義了系統中n個進程中的每一個進程對m類資源的最大需求。如果Max[i,j]=K,則表示進程i需要Rj類資源的最大數目為K。
3、分配矩陣Allocation 這也是一個n×m的矩陣,它定義了系統中每一類資源當前已分配給每一進程的資源數。如果Allocation[i,j]=K,則表示進程i當前已分得Rj類資源的數目為K。
4、需求矩陣Need 這也是一個n×m的矩陣,用以表示每一個進程尚需的各類資源數。如果Need[i,j]=K,則表示進程i還需要Rj類資源K個,方能完成其任務。 Need[i,j]=Max[i,j]-Allocation[i,j]
操作系統的兩種狀態
安全序列:是指一個進程序列{P1,…,Pn}是安全的,即對于每一個進程Pi(1≤i≤n),它以后尚需要的資源量不超過系統當前剩余資源量與所有進程Pj (j < i )當前占有資源量之和。
1、安全狀態:如果存在一個由系統中所有進程構成的安全序列P1,…,Pn,則系統處于安全狀態。安全狀態一定是沒有死鎖發生。
2、不安全狀態:不存在一個安全序列。不安全狀態不一定導致死鎖。
示例
首先判斷一下當前的安全序列: 當前狀態,可利用資源向量有 1 6 2 2
1、P0: 已分配 0 0 3 2, 還需要 0 0 1 2,當前可利用資源 1 6 2 2足夠分配給P0; Process Allocation Need Available(Available-Need) Available+Allocation P0 0 0 4 4 0 0 0 0 1 6 1 0 1 6 5 4 P0分配成功:進入安全序列,分配完成后,將資源還給可利用資源
2、P1:已分配1 0 0 0, 還需要 1 7 5 0,當前可利用資源 1 6 5 4不夠分配給P1; P1分配失敗
3、P2: 已分配1 3 5 4, 還需要 2 3 5 6,當前可利用資源 1 6 5 4不夠分配給P2; P2分配失敗
4、P3: 已分配 0 3 3 2, 還需要 0 6 5 2,當前可利用資源 1 6 5 4足夠分配給P3; Process Allocation Need Available(Available-Need) Available+Allocation P3 0 9 8 4 0 0 0 0 1 0 0 2 1 9 8 6 P3分配成功,進入安全序列,分配完成后,將資源還給可利用資源
5、P4: 已分配 0 0 1 4, 還需要 0 6 5 6,當前可利用資源 1 9 8 6足夠分配給P4; Process Allocation Need Available(Available-Need) Available+Allocation P4 0 6 6 10 0 0 0 0 1 3 3 0 1 9 9 10 P4分配成功,進入安全序列,分配完成后,將資源還給可利用資源
6、P1: 已分配 1 0 0 0, 還需要 1 7 5 0,當前可利用資源 1 9 9 10足夠分配給P1; Process Allocation Need Available(Available-Need) Available+Allocation P1 2 7 5 0 0 0 0 0 0 2 4 10 2 9 9 10 P1分配成功,進入安全序列,分配完成后,將資源還給可利用資源
7、P2: 已分配 1 3 5 4, 還需要 2 3 5 6,當前可利用資源 2 9 9 10足夠分配給P2; Process Allocation Need Available(Available-Need) Available+Allocation P2 3 6 10 10 0 0 0 0 0 6 4 4 3 12 14 14 P4分配成功,進入安全序列,分配完成后,將資源還給可利用資源
所以:當前的安全序列為: p0-p3-p4-p1-p2
如果在未分配的時候:p2請求 1 2 2 2 ,從資源池里給他分配,請問可以分配嗎?
答: 如果滿足了P2的請求1 2 2 2 的話,要從可利用資源Available 1 6 2 2 中減去1 2 2 2,此時可利用資源為0 4 0 0 , 縱觀全局,如果滿足了P2的請求,那么別的進程的需求都不能滿足,導致資源不夠分配,所以P2的請求不可以分配
Java代碼實現銀行家算法
import java.util.Scanner;
public class Banker {
int available[] = new int[]{3,3,2};//可利用的資源
int max[][] = new int[][]{{7,5,3},{3,2,2},{9,0,2},{2,2,2},{4,3,3}};//每個進程最大資源數
int allocation[][] = new int[][]{{0,1,0},{2,0,0},{3,0,2},{2,1,1},{0,0,2}};//每個進程目前擁有的資源數
int need[][] = new int[][]{{7,4,3},{1,2,2},{6,0,0},{0,1,1},{4,3,1}};//每個進程需要的資源數
void showData() {
//展示數據輸出每個進程的相關數
System.out.println("進程號 Max All Need ");
System.out.println(" A B C A B C A B C");
for(int i = 0;i<5;i++){
System.out.print(i+" ");
for(int m = 0;m<3;m++) System.out.print(max[i][m]+" ");
for(int m = 0;m<3;m++) System.out.print(allocation[i][m]+" ");
for(int m = 0;m<3;m++) System.out.print(need[i][m]+" ");
System.out.println();
}
}
boolean change(int inRequestNum,int inRequest[])//分配數據
{
int requestNum = inRequestNum;
int request[] = inRequest;
// for(int i=0;i<3;i++)System.out.println("修改前available"+available[i]);
if(!(request[0]<=need[requestNum][0]&&request[1]<=need[requestNum][1]&&request[2]<=need[requestNum][2]))
{
//request[0]<=need[requestNum][0]
//request[1]<=need[requestNum][1]
//request[2]<=need[requestNum][2]
//每一類請求資源小于當前線程need的資源數
System.out.println("請求的資源數超過了所需要的最大值,分配錯誤");
return false;
}
if((request[0]<=available[0]&&request[1]<=available[1]&&request[2]<=available[2])==false)
{
//當前線程的每一類請求資源小于等于資源池對應資源的數量
System.out.println("尚無足夠資源分配,必須等待");
return false;
}
for(int i = 0;i<3;i++)//試分配數據給請求的線程
{
available[i] = available[i]-request[i];
//資源池的每類資源減去每類請求資源數量
allocation[requestNum][i] = allocation[requestNum][i] + request[i];
//當前線程allocation中每類資源加上每類資源請求數量
need[requestNum][i] = need[requestNum][i] - request[i];
//當前線程need中每類資源數量減去每類資源的請求數量
}
// for(int i=0;i<3;i++)System.out.println("修改后available"+available[i]);
boolean flag = checkSafe(available[0],available[1],available[2]);//進行安全性檢查并返回是否安全
// System.out.println("安全性檢查后"+flag);
if(flag==true)
{
System.out.println("能夠安全分配");
return true;
}
else//不能通過安全性檢查 恢復到未分配前的數據
{
System.out.println("不能夠安全分配");
for(int i = 0;i<3;i++)
{
available[i] = available[i]+request[i];
allocation[requestNum][i] = allocation[requestNum][i] - request[i];
need[requestNum][i] = need[requestNum][i] + request[i];
}
return false;
}
}
boolean checkSafe(int a,int b,int c)//安全性檢查
{
int work[] = new int[3];
work[0] = a;
work[1] = b;
work[2] = c;
int i=0;
boolean finish[] = new boolean[5];
while(i<5)//尋找一個能夠滿足的認為完成后才去執行下一進程
{
if(finish[i]==false&&need[i][0]<=work[0]&&need[i][1]<=work[1]&&need[i][2]<=work[2])
{//找到滿足的修改work值,然后i=0,重新從開始的為分配的中尋找
System.out.println("分配成功的是"+i);
for(int m = 0;m<3;m++)
work[m] =work[m] + allocation[i][m];
finish[i] = true;
i=0;
}
else//如果沒有找到直接i++
i++;
}
for(i=0;i<5;i++)//通過finish數組判斷是否都可以分配
{
if(finish[i]==false)
return false;
}
return true;
}
public static void main(String[] args)
{
Banker bank = new Banker();
bank.showData();
//請求線程資源存放的數組
int request[] =new int[3];
int requestNum;
String source[] = new String[]{"A","B","C"};
Scanner s = new Scanner(System.in);
String choice = new String();
while(true)//循環進行分配
{
System.out.println("請輸入要請求的進程號(0--4):");
requestNum = s.nextInt();
System.out.print("請輸入請求的資源數目");
for(int i = 0;i<3;i++)
{
System.out.println(source[i]+"資源的數目:");
request[i] = s.nextInt();
}
bank.change(requestNum, request);
System.out.println("是否再請求分配(y/n)");
choice = s.next();
if(choice.equals("n"))
break;
}
}
}
運行結果:
總結
- 掌握死鎖是什么,怎么產生
- 可以寫出死鎖代碼證明對死鎖的理解
- 可以通過工具檢測死鎖和解決死鎖問題
- 掌握死鎖思維,在編程時避免死鎖