1 池化技術之線程池
什么是池化技術?簡單來說就是優化資源的使用,我準備好了一些資源,有人要用就到我這里拿,用完了就還給我。而一個比較重要的的實現就是線程池。那么線程池用到了池化技術有什么好處呢?
- 降低資源的消耗
- 提高響應的速度
- 方便管理
也就是 線程復用、可以控制最大并發數、管理線程
2 線程池的五種實現方式
其實線程池我更愿意說成四種封裝實現方式,一種原始實現方式。這四種封裝的實現方式都是依賴于最原始的的實現方式。所以這里我們先介紹四種封裝的實現方式
2.1 newSingleThreadExecutor()
這個線程池很有意思,說是線程池,但是池子里面只有一條線程。如果線程因為異常而停止,會自動新建一個線程補充。我們可以測試一下:我們對線程池執行十條打印任務,可以發現它們用的都是同一條線程
public static void test01() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
//對線程進行執行十條打印任務
for(int i = 1; i <= 10; i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"=>執行完畢!");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//用完線程池一定要記得關閉
threadPool.shutdown();
}
}
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
2.2 newFixedThreadPool(指定線程數量)
這個線程池是可以指定我們的線程池大小的,可以針對我們具體的業務和情況來分配大小。它是創建一個核心線程數跟最大線程數相同的線程池,因此池中的線程數量既不會增加也不會變少,如果有空閑線程任務就會被執行,如果沒有就放入任務隊列,等待空閑線程。我們同樣來測試一下:
public static void test02() {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
//對線程進行執行十條打印任務
for(int i = 1; i <= 10; i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"=>執行完畢!");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//用完線程池一定要記得關閉
threadPool.shutdown();
}
}
我們創建了五條線程的線程池,在打印任務的時候,可以發現線程都有進行工作
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-2=>執行完畢!
pool-1-thread-3=>執行完畢!
pool-1-thread-5=>執行完畢!
pool-1-thread-4=>執行完畢!
2.3 newCachedThreadPool()
這個線程池是創建一個核心線程數為0,最大線程為Inter.MAX_VALUE的線程池,也就是說沒有限制,線程池中的線程數量不確定,但如果有空閑線程可以復用,則優先使用,如果沒有空閑線程,則創建新線程處理任務,處理完放入線程池。我們同樣來測試一下
2.4 newScheduledThreadPool(指定最大線程數量)
創建一個沒有最大線程數限制的可以定時執行線程池在這里,還有創建一個只有單個線程的可以定時執行線程池(Executors.newSingleThreadScheduledExecutor())這些都是上面的線程池擴展開來了,不詳細介紹了。
3 介紹線程池的七大參數
上面我們也說到了線程池有五種實現方式,但是實際上我們就介紹了四種。那么最后一種是什么呢?不急,我們可以點開我們上面線程池實現方式的源碼進行查看,可以發現
- newSingleThreadExecutor()的實現源碼
而點開其他幾個線程池到最后都可以發現,他們實際上用的就是這個ThreadPoolExecutor。我們把源代碼粘過來分析,其實也就是這七大參數
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
毫無懸念,這就是最后一種方式,也是其他實現方式的基礎。而用這種方式也是最容易控制,因為我們可以自由的設置參數。在阿里巴巴開發手冊中也提到了
所以我們更需要去了解這七大參數,在平時用線程池的時候盡量去用ThreadPoolExecutor。而關于這七大參數我們簡單概括就是
- corePoolSize: 線程池核心線程個數
- workQueue: 用于保存等待執行任務的阻塞隊列
- maximunPoolSize: 線程池最大線程數量
- ThreadFactory: 創建線程的工廠
- RejectedExecutionHandler: 隊列滿,并且線程達到最大線程數量的時候,對新任務的處理策略
- keeyAliveTime: 空閑線程存活時間
- TimeUnit: 存活時間單位
3.1 而關于線程池最大線程數量,我們也有兩種設置方式
- CPU密集型
獲得cpu的核數,不同的硬件不一樣,設置核數的的線程數量。
我們可以通過代碼Runtime.getRuntime().availableProcessors();獲取,然后設置。 - IO密集型
IO非常消耗資源,所以我們需要計算大型的IO程序任務有多少個。
一般來說,線程池最大值 > 大型任務的數量即可
一般設置大型任務的數量*2
這里我們用一個例子可以更好理解這些參數在線程池里面的位置和作用。如圖1.0,我們這是一個銀行
我們一共有五個柜臺,可以理解為線程池的最大線程數量,而其中有兩個是在營業中,可以理解為線程池核心線程個數。而下面的等待廳可以理解為用于保存等待執行任務的阻塞隊列。銀行就是創建線程的工廠。而關于空閑線程存活時間,我們可以理解為如圖1.1這種情況,當五個營業中,卻只有兩個人需要被服務,而其他三個人一直處于等待的情況下,等了一個小時了,他們被通知下班了。這一個小時時間就可以說是空閑線程存活時間,而存活時間單位,顧名思義。
到現在我們就剩一個拒絕策略還沒介紹,什么是拒絕策略呢?我們可以假設當銀行五個柜臺都有人在被服務,如圖1.2。而等待廳這個時候也是充滿了人,銀行實在容不下人了。
這個時候對銀行外面那個等待的人的處理策略就是拒絕策略。我們同樣了解之后用代碼來測試一下:
public static void test05(){
ExecutorService threadPool = new ThreadPoolExecutor(
//核心線程數量
2,
//最大線程數量
5,
//空閑線程存活時間
3,
//存活單位
TimeUnit.SECONDS,
//這里我們使用大多數線程池都默認使用的阻塞隊列,并使容量為3
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
//我們使用默認的線程池都默認用的拒絕策略
new ThreadPoolExecutor.AbortPolicy()
);
try {
//對線程進行執行十條打印任務
for(int i = 1; i <= 2; i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"=>執行完畢!");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//用完線程池一定要記得關閉
threadPool.shutdown();
}
}
我們執行打印兩條任務,可以發現線程池只用到了我們的核心兩條線程,相當于只有兩個人需要被服務,所以我們就開了兩個柜臺。
pool-1-thread-1=>執行完畢!
pool-1-thread-2=>執行完畢!
但是在我們將打印任務改到大于5的時候,(我們改成8)我們可以發現線程池的五條線程都在使用了,人太多了,我們的銀行需要都開放了來服務。
for(int i = 1; i <= 8; i++)
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-2=>執行完畢!
pool-1-thread-3=>執行完畢!
pool-1-thread-4=>執行完畢!
pool-1-thread-5=>執行完畢!
在我們改成大于8的時候,可以發現拒絕策略觸發了。銀行實在容納不下了,所以我們把外面那個人用策略打發了。
for(int i = 1; i <= 9; i++)
在這里我們也可以得出一個結論:線程池大小= 最大線程數 + 阻塞隊列大小
在上面我們在使用的阻塞隊列是大多數的線程池都使用的阻塞隊列,所以就引發思考下面這個問題。
3.2 為什么大部分的線程池都用LinkedBlockingQueue?
- LinkedBlockingQueue 使用單向鏈表實現,在聲明LinkedBlockingQueue的時候,可以不指定隊列長度,長度為Integer.MAX_VALUE, 并且新建了一個Node對象,Node對象具有item,next變量,item用于存儲元素,next指向鏈表下一個Node對象,在剛開始的時候鏈表的head,last都指向該Node對象,item、next都為null,新元素放在鏈表的尾部,并從頭部取元素。取元素的時候只是一些指針的變化,LinkedBlockingQueue給put(放入元素),take(取元素)都聲明了一把鎖,放入和取互不影響,效率更高。
- ArrayBlockingQueue 使用數組實現,在聲明的時候必須指定長度,如果長度太大,造成內存浪費,長度太小,并發性能不高,如果數組滿了,就無法放入元素,除非有其他線程取出元素,放入和取出都使用同一把鎖,因此存在競爭,效率比LinkedBlockingQueue低。
4 四種策略
我們在使用ThreadPoolExecutor的時候是可以自己選擇拒絕策略的,而拒絕策略我們所知道的有四種。
- AbortPolicy(被拒絕了拋出異常)
- CallerRunsPolicy(使用調用者所在線程執行,就是哪里來的回哪里去)
- DiscardOldestPolicy(嘗試去競爭第一個,失敗了也不拋異常)
- DiscardPolicy(默默丟棄、不拋異常)
4.1 AbortPolicy
我們在上面使用的就是AbortPolicy拒絕策略,在執行打印任務超出線程池大小的時候,拋出了異常。
4.2 CallerRunsPolicy
我們將拒絕策略修改為CallerRunsPolicy,執行后可以發現,因為第九個打印任務被拒絕了,所以它被調用者所在的線程執行了,也就是我們的main線程。(因為它從main線程來的,現在又回到了main線程。所以我們說它從哪里來回哪里去)
ExecutorService threadPool = new ThreadPoolExecutor(
//核心線程數量
2,
//最大線程數量
5,
//空閑線程存活時間
3,
//存活單位
TimeUnit.SECONDS,
//這里我們使用大多數線程池都默認使用的阻塞隊列,并使容量為3
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
//我們使用默認的線程池都默認用的拒絕策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
pool-1-thread-2=>執行完畢!
main=>執行完畢!
pool-1-thread-2=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-2=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-3=>執行完畢!
pool-1-thread-4=>執行完畢!
pool-1-thread-5=>執行完畢!
4.3 DiscardOldestPolicy
嘗試去競爭第一個任務,但是失敗了。這里就沒顯示了,也不拋出異常。
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-2=>執行完畢!
pool-1-thread-3=>執行完畢!
pool-1-thread-4=>執行完畢!
pool-1-thread-5=>執行完畢!
4.4 DiscardPolicy
多出來的任務,默默拋棄掉,也不拋出異常。
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-1=>執行完畢!
pool-1-thread-2=>執行完畢!
pool-1-thread-3=>執行完畢!
pool-1-thread-4=>執行完畢!
pool-1-thread-5=>執行完畢!
可以看到我們的DiscardOldestPolicy與DiscardPolicy一樣的結果,但是它們其實是不一樣,正如我們最開始總結的那樣,DiscardOldestPolicy在多出的打印任務的時候會嘗試去競爭,而不是直接拋棄掉,但是很顯然競爭失敗不然也不會和DiscardPolicy一樣的執行結果。但是如果在線程比較多的時候就可以很看出來。