1.池化背景
在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。在JAVA中更是如此,虛擬機將試圖跟蹤每一個對象,以便能夠在對象銷毀后進行垃圾回收。所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。如何利用已有對象來服務就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因 。
2.java線程池的優勢
(1):降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
(2):提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
(3):提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。但是,要做到合理利用線程池,必須對其實現原理了如指掌
3.線程池實現原理
當向線程池提交一個任務之后,線程池是如何處理這個任務的呢?本節來看一下線程池的主要處理流程,處理流程圖如圖3-1所示:
從圖中可以看出,當提交一個新任務到線程池時,線程池的處理流程如下。 1)線程池判斷核心線程池里的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池里的線程都在執行任務,則進入下個流程。 2)線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。 3)線程池判斷線程池的線程是否都處于工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。 ThreadPoolExecutor執行execute()方法的示意圖,如圖3-2所示:
ThreadPoolExecutor執行execute方法分下面4種情況。 1)如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。 2)如果運行的線程等于或多于corePoolSize,則將任務加入BlockingQueue。 3)如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。 4)如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,并調用
RejectedExecutionHandler.rejectedExecution()方法。 ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后(當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。 源碼分析:上面的流程分析讓我們很直觀地了解了線程池的工作原理,讓我們再通過源代碼來看看是如何實現的,線程池執行任務的方法如下。
scss復制代碼public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取clt,clt記錄著線程池狀態和運行線程數。
int c = ctl.get();
//運行線程數小于核心線程數時,創建線程放入線程池中,并且運行當前任務。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//創建線程失敗,重新獲取clt。
c = ctl.get();
}
//線程池是運行狀態并且運行線程大于核心線程數時,把任務放入隊列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新檢查線程池不是運行狀態時,
//把任務移除隊列,并通過拒絕策略對該任務進行處理。
if (! isRunning(recheck) && remove(command))
reject(command);
//當前運行線程數為0時,創建線程加入線程池中。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//運行線程大于核心線程數時并且隊列已滿時,
//創建線程放入線程池中,并且運行當前任務。
else if (!addWorker(command, false))
//運行線程大于最大線程數時,失敗則拒絕該任務
reject(command);
}
在execute()方法中多次調用addWorker方法。其源碼如下
ini復制代碼private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//獲取clt,clt記錄著線程池狀態和運行線程數。
int c = ctl.get();
//獲取線程池的運行狀態。
int rs = runStateOf(c);
//線程池處于關閉狀態,或者當前任務為null
//或者隊列不為空,則直接返回失敗。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程池中的線程數
int wc = workerCountOf(c);
//線程數超過CAPACITY,則返回false;
//這里的core是addWorker方法的第二個參數,
//如果為true則根據核心線程數進行比較,
//如果為false則根據最大線程數進行比較。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//嘗試增加線程數,如果成功,則跳出第一個for循環
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加線程數失敗,則重新獲取ctl
c = ctl.get();
//如果當前的運行狀態不等于rs,說明狀態已被改變,
//返回第一個for循環繼續執行
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根據當前任務來創建Worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mAInLock = this.mainLock;
mainLock.lock();
try {
//獲得鎖以后,重新檢查線程池狀態
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//把剛剛創建的線程加入到線程池中
workers.add(w);
int s = workers.size();
//記錄線程池中出現過的最大線程數量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動線程,開始運行任務
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
4.線程池的創建
ini復制代碼 public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
創建一個線程池時需要輸入幾個參數,如下。 1)corePoolSize(線程池的基本大?。?必需) :當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大于線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建并啟動所有基本線程。
2)maximumPoolSize(線程池最大數量)(必需) :線程池允許創建的最大線程數。如果隊列滿了,并 且已創建的線程數小于最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如 果使用了無界的任務隊列這個參數就沒什么效果。
3)keepAliveTime(線程活動保持時間)(必需) :線程池的工作線程空閑后,保持存活的時間。所以, 如果任務很多,并且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。
4)TimeUnit(線程活動保持時間的單位)(必需) :可選的單位有天(DAYS)、小時(HOURS)、分鐘 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒 (NANOSECONDS,千分之一微秒)。
5)workQueue(任務隊列)(必需) :用于保存等待執行的任務的阻塞隊列??梢赃x擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原 則對元素進行排序。
- LinkedBlockingQueue:一個基于鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
- PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
6)ThreadFactory (可選) :用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設置有意義的名字,代碼如下: new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
- RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
當然,也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任
示例代碼:
less復制代碼private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("atlas-pool-%d").build();
private ExecutorService fixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, Runtime.getRuntime().availableProcessors() * 40,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
namedThreadFactory);
//多檢查使用
public List<AtlasElementDataDTO> getData(@RequestBody AtlasElementDataVO atlasElementDataVO, HttpServletRequest request) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(atlasElementDataVO.getAtlasElementDataParamList().size());
UserForm currentUser = RequestUserHolder.getCurrentUser();
String url = IasStringUtils.getFileNameByUrl(request.getHeader("Referer"));
List<AtlasElementDataDTO> list = new ArrayList<>();
log.info("data query start total start" + IasDateUtils.dateTimeToStringTime(new Date()));
atlasElementDataVO.getAtlasElementDataParamList().forEach(atlasElementDataParamDTO -> {
fixedThreadPool.submit(() -> {
try {
log.info("data query start" + IasDateUtils.dateTimeToStringTime(new Date()));
AtlasElementDataDTO atlasElementDataDTO = new AtlasElementDataDTO();
atlasElementDataDTO.setId(atlasElementDataParamDTO.getId());
atlasElementDataDTO.setRecord(atlasService.getData(atlasElementDataParamDTO.getId(), atlasElementDataParamDTO.getParam(), currentUser != null ? currentUser.getUserId() : null, url));
atlasElementDataDTO.set_xAxis(atlasElementDataParamDTO.get_xAxis());
list.add(atlasElementDataDTO);
log.info("data query end" + IasDateUtils.dateTimeToStringTime(new Date()));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.QUERY_ERROR.getCode(), ErrorCodeEnum.QUERY_ERROR.getMessage());
}finally {
countDownLatch.countDown();
}
});
});
countDownLatch.await();
log.info("data query start total end :" + IasDateUtils.dateTimeToStringTime(new Date()));
return list;
}
5.線程池執行任務
可以使用兩個方法向線程池提交任務,分別為execute()和submit()方法。execute()方法用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute()方法輸入的任務是一個Runnable類的實例。
less復制代碼threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
submit()方法用于提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。
dart復制代碼Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執行任務異常
} finally {
// 關閉線程池
executor.shutdown();
}
6.關閉線程池
可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,并返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至于應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。
7.合理配置線程池
要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
- 任務的優先級:高、中和低。
- 任務的執行時間:長、中和短。
- 任務的依賴性:是否依賴其他系統資源,如數據庫連接。
性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿眨绻梢圆鸱?,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐量將高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解??梢酝ㄟ^Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先執行。 注意 如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。 執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先級隊列,讓執行時間短的任務先執行。 依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那么線程數應該設置得越大,這樣才能更好地利用CPU。 建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有一次,我們系統里后臺任務線程池的隊列和線程池全滿了,不斷拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞,任務積壓在線程池里。如果當時我們設置成無界隊列,那么線程池的隊列就會越來越多, 有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然,我們的系統所有的任務是用單獨的服務器部署的,我們使用不同規模的線程池完成不同類型的任務,但是出現這樣問題時也會影響到其他任務。
一般來說池中總線程數是核心池線程數量兩倍,只要確保當核心池有線程停止時,核心池外能有線程進入核心池即可。 線程中的任務最終是交給CPU的線程去處理的,而CPU可同時處理線程數量大部分是CPU核數的兩倍,運行環境中CPU的核數我們可以通過Runtime.getRuntime().availableProcessors()這個方法而獲取。理論上來說核心池線程數量應該為Runtime.getRuntime().availableProcessors()*2,那么結果是否符合我們的預期呢,事實上大部分的任務都是I/O密集型的,即大部分任務消耗集中在的輸入輸出。而CPU密集型任務主要消耗CPU資源進行計算,當任務為CPU密集型時,核心池線程數設置為CPU核數+1即可)
8.線程池監控
如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的時候可以使用以下屬性。 1)taskCount:線程池需要執行的任務數量。 2)completedTaskCount:線程池在運行過程中已完成的任務數量,小于或等于taskCount。 3)largestPoolSize:線程池里曾經創建過的最大線程數量。通過這個數據可以知道線程池是否曾經滿過。如該數值等于線程池的最大大小,則表示線程池曾經滿過。 4)getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池里的線程不會自動銷毀,所以這個大小只增不減。 5)getActiveCount:獲取活動的線程數。 通過擴展線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行后和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。