作者公眾號:一角錢技術(org_yijiaoqian)
前言
線程池的具體實現有兩種,分別是ThreadPoolExecutor 默認線程池和ScheduledThreadPoolExecutor 定時線程池,上一篇已經分析過ThreadPoolExecutor原理與使用了,本篇我們來重點分析下ScheduledThreadPoolExecutor的原理與使用。
- 《并發編程之Executor線程池原理與源碼解讀》
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 與 ThreadPoolExecutor 線程池的概念有些區別,它是一個支持任務周期性調度的線程池。
ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor,同時通過實現 ScheduledExecutorSerivce 來擴展基礎線程池的功能,使其擁有了調度能力。其整個調度的核心在于內部類 DelayedWorkQueue ,一個有序的延時隊列。
定時線程池類的類結構圖如下:
ScheduledThreadPoolExecutor 的出現,很好的彌補了傳統 Timer 的不足,具體對比看下表:
TimerScheduledThreadPoolExecutor線程單線程多線程多任務任務之間相互影響任務之間不影響調度時間絕對時間相對時間異常單任務異常,后續任務受影響無影響
工作原理
它用來處理延時任務或定時任務
它接收SchduledFutureTask類型的任務,是線程池調度任務的最小單位,有三種提交任務的方式:
- schedule,特定時間延時后執行一次任務
- scheduledAtFixedRate,固定周期執行任務(與任務執行時間無關,周期是固定的)
- scheduledWithFixedDelay,固定延時執行任務(與任務執行時間有關,延時從上一次任務完成后開始)
它采用 DelayedWorkQueue 存儲等待的任務
- DelayedWorkQueue 內部封裝了一個 PriorityQueue ,它會根據 time 的先后時間排序,若 time 相同則根據 sequenceNumber 排序;
- DelayedWorkQueue 也是一個無界隊列;
因為前面講阻塞隊列實現的時候,已經對DelayedWorkQueue進行了說明,更多內容請查看《阻塞隊列 — DelayedWorkQueue源碼分析》
工作線程的執行過程:
- 工作線程會從DelayedWorkerQueue取已經到期的任務去執行;
- 執行結束后重新設置任務的到期時間,再次放回DelayedWorkerQueue。
take方法是什么時候調用的呢? 在ThreadPoolExecutor中,getTask方法,工作線程會循環地從workQueue中取任務。但定時任務卻不同,因為如果一旦getTask方法取出了任務就開始執行了,而這時可能還沒有到執行的時間,所以在take方法中,要保證只有在到指定的執行時間的時候任務才可以被取走。
PS:對于以上原理的理解,可以通過下面的源碼分析加深印象。
源碼分析
構造方法
ScheduledThreadPoolExecutor有四個構造形式:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
當然我們也可以使用工具類Executors的newScheduledThreadPool的方法,快速創建。注意這里使用的DelayedWorkQueue。
ScheduledThreadPoolExecutor沒有提供帶有最大線程數的構造函數的,默認是Integer.MAX_VALUE,說明其可以無限制的開啟任意線程執行任務,在大量任務系統,應注意這一點,避免內存溢出。
核心方法
核心方法主要介紹ScheduledThreadPoolExecutor的調度方法,其他方法與 ThreadPoolExecutor 一致。調度方法均由 ScheduledExecutorService 接口定義:
public interface ScheduledExecutorService extends ExecutorService {
// 特定時間延時后執行一次Runnable
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 特定時間延時后執行一次Callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 固定周期執行任務(與任務執行時間無關,周期是固定的)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 固定延時執行任務(與任務執行時間有關,延時從上一次任務完成后開始)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
我們再來看一下接口的實現,具體是怎么來實現線程池任務的提交。因為最終都回調用 delayedExecute 提交任務。所以,我們這里只分析schedule方法,該方法是指任務在指定延遲時間到達后觸發,只會執行一次。源代碼如下:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//參數校驗
if (command == null || unit == null)
throw new NullPointerException();
//這里是一個嵌套結構,首先把用戶提交的任務包裝成ScheduledFutureTask
//然后在調用decorateTask進行包裝,該方法是留給用戶去擴展的,默認是個空方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//包裝好任務以后,就進行提交了
delayedExecute(t);
return t;
}
delayedExecute 任務提交方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池已經關閉,則使用拒絕策略把提交任務拒絕掉
if (isShutdown())
reject(task);
else {
//與ThreadPoolExecutor不同,這里直接把任務加入延遲隊列
super.getQueue().add(task);//使用用的DelayedWorkQueue
//如果當前狀態無法執行任務,則取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//這里是增加一個worker線程,避免提交的任務沒有worker去執行
//原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列
ensurePrestart();
}
}
我們可以看到提交到線程池的任務都包裝成了 ScheduledFutureTask,繼續往下我們再來研究下。
ScheduledFutureTask
從ScheduledFutureTask類的定義可以看出,ScheduledFutureTask類是ScheduledThreadPoolExecutor類的私有內部類,繼承了FutureTask類,并實現了RunnableScheduledFuture接口。也就是說,ScheduledFutureTask具有FutureTask類的所有功能,并實現了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask類的定義如下所示:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>
ScheduledFutureTask類繼承圖如下:
成員變量
SchduledFutureTask接收的參數(成員變量):
// 任務開始的時間
private long time;
// 任務添加到ScheduledThreadPoolExecutor中被分配的唯一序列號
private final long sequenceNumber;
// 任務執行的時間間隔
private final long period;
//ScheduledFutureTask對象,實際指向當前對象本身
RunnableScheduledFuture<V> outerTask = this;
//當前任務在延遲隊列中的索引,能夠更加方便的取消當前任務
int heapIndex;
解析:
- sequenceNumber:任務添加到ScheduledThreadPoolExecutor中被分配的唯一序列號,可以根據這個序列號確定唯一的一個任務,如果在定時任務中,如果一個任務是周期性執行的,但是它們的sequenceNumber的值相同,則被視為是同一個任務。
- time:下一次執行任務的時間。
- period:任務的執行周期。
- outerTask:ScheduledFutureTask對象,實際指向當前對象本身。此對象的引用會被傳入到周期性執行任務的ScheduledThreadPoolExecutor類的reExecutePeriodic方法中。
- heapIndex:當前任務在延遲隊列中的索引,這個索引能夠更加方便的取消當前任務。
構造方法
ScheduledFutureTask類繼承了FutureTask類,并實現了RunnableScheduledFuture接口。在ScheduledFutureTask類中提供了如下構造方法。
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
FutureTask的構造方法如下:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
通過源碼可以看到,在ScheduledFutureTask類的構造方法中,首先會調用FutureTask類的構造方法為FutureTask類的callable和state成員變量賦值,接下來為ScheduledFutureTask類的time、period和sequenceNumber成員變量賦值。理解起來比較簡單。
getDelay方法
我們先來看getDelay方法的源碼,如下所示:
//獲取下次執行任務的時間距離當前時間的納秒數
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
getDelay方法比較簡單,主要用來獲取下次執行任務的時間距離當前系統時間的納秒數。
compareTo方法
ScheduledFutureTask類在類的結構上實現了Comparable接口,compareTo方法主要是對Comparable接口定義的compareTo方法的實現。源碼如下所示:
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
這段代碼看上去好像是對各種數值類型數據的比較,本質上是對延遲隊列中的任務進行排序。排序規則為:
- 首先比較延遲隊列中每個任務下次執行的時間,下次執行時間距離當前時間短的任務會排在前面;
- 如果下次執行任務的時間相同,則會比較任務的sequenceNumber值,sequenceNumber值小的任務會排在前面。
isPeriodic方法
isPeriodic方法的源代碼如下所示:
//判斷是否是周期性任務
public boolean isPeriodic() {
return period != 0;
}
這個方法主要是用來判斷當前任務是否是周期性任務。這里只要判斷運行任務的執行周期不等于0就能確定為周期性任務了。因為無論period的值是大于0還是小于0,當前任務都是周期性任務。
setNextRunTime方法
setNextRunTime方法的作用主要是設置當前任務下次執行的時間,源碼如下所示:
private void setNextRunTime() {
long p = period;
//固定頻率,上次執行任務的時間加上任務的執行周期
if (p > 0)
time += p;
//相對固定的延遲執行,當前系統時間加上任務的執行周期
else
time = triggerTime(-p);
}
這里再一次證明了使用isPeriodic方法判斷當前任務是否為周期性任務時,只要判斷period的值是否不等于0就可以了。
- 因為如果當前任務時固定頻率執行的周期性任務,會將周期period當作正數來處理;
- 如果是相對固定的延遲執行當前任務,則會將周期period當作負數來處理。
這里我們看到在setNextRunTime方法中,調用了ScheduledThreadPoolExecutor類的triggerTime方法。接下來我們看下triggerTime方法的源碼。
ScheduledThreadPoolExecutor類的triggerTime方法
triggerTime方法用于獲取延遲隊列中的任務下一次執行的具體時間。源碼如下所示。
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
這兩個triggerTime方法的代碼比較簡單,就是獲取下一次執行任務的具體時間。有一點需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。
我們看到在triggerTime方法中處理防止溢出的邏輯使用了ScheduledThreadPoolExecutor類的overflowFree方法,接下來,我們就看看ScheduledThreadPoolExecutor類的overflowFree方法的實現。
ScheduledThreadPoolExecutor類的overflowFree方法
overflowFree方法的源代碼如下所示:
private long overflowFree(long delay) {
//獲取隊列中的節點
Delayed head = (Delayed) super.getQueue().peek();
//獲取的節點不為空,則進行后續處理
if (head != null) {
//從隊列節點中獲取延遲時間
long headDelay = head.getDelay(NANOSECONDS);
//如果從隊列中獲取的延遲時間小于0,并且傳遞的delay
//值減去從隊列節點中獲取延遲時間小于0
if (headDelay < 0 && (delay - headDelay < 0))
//將delay的值設置為Long.MAX_VALUE + headDelay
delay = Long.MAX_VALUE + headDelay;
}
//返回延遲時間
return delay;
}
通過對overflowFree方法的源碼分析,可以看出overflowFree方法本質上就是為了限制隊列中的所有節點的延遲時間在Long.MAX_VALUE值之內,防止在compareTo方法中溢出。
cancel方法
cancel方法的作用主要是取消當前任務的執行,源碼如下所示:
public boolean cancel(boolean mayInterruptIfRunning) {
//取消任務,返回任務是否取消的標識
boolean cancelled = super.cancel(mayInterruptIfRunning);
//如果任務已經取消
//并且需要將任務從延遲隊列中刪除
//并且任務在延遲隊列中的索引大于或者等于0
if (cancelled && removeOnCancel && heapIndex >= 0)
//將當前任務從延遲隊列中刪除
remove(this);
//返回是否成功取消任務的標識
return cancelled;
}
這段代碼理解起來相對比較簡單,首先調用取消任務的方法,并返回任務是否已經取消的標識。如果任務已經取消,并且需要移除任務,同時,任務在延遲隊列中的索引大于或者等于0,則將當前任務從延遲隊列中移除。最后返回任務是否成功取消的標識。
run方法
run方法可以說是ScheduledFutureTask類的核心方法,是對Runnable接口的實現,源碼如下所示:
public void run() {
//當前任務是否是周期性任務
boolean periodic = isPeriodic();
//線程池當前運行狀態下不能執行周期性任務
if (!canRunInCurrentRunState(periodic))
//取消任務的執行
cancel(false);
//如果不是周期性任務
else if (!periodic)
//則直接調用FutureTask類的run方法執行任務
ScheduledFutureTask.super.run();
//如果是周期性任務,則調用FutureTask類的runAndReset方法執行任務
//如果任務執行成功
else if (ScheduledFutureTask.super.runAndReset()) {
//設置下次執行任務的時間
setNextRunTime();
//重復執行任務
reExecutePeriodic(outerTask);
}
}
整理一下方法的邏輯:
- 首先判斷當前任務是否是周期性任務。如果線程池當前運行狀態下不能執行周期性任務,則取消任務的執行,否則執行步驟2;
- 如果當前任務不是周期性任務,則直接調用FutureTask類的run方法執行任務,會設置執行結果,然后直接返回,否則執行步驟3;
- 如果當前任務是周期性任務,則調用FutureTask類的runAndReset方法執行任務,不會設置執行結果,然后直接返回,否則執行步驟4;
- 如果任務執行成功,則設置下次執行任務的時間,同時,將任務設置為重復執行。
這里,調用了FutureTask類的run方法和runAndReset方法,并且調用了ScheduledThreadPoolExecutor類的reExecutePeriodic方法。接下來,我們分別看下這些方法的實現。
FutureTask類的run方法
FutureTask類的run方法源碼如下所示:
public void run() {
//狀態如果不是NEW,說明任務或者已經執行過,或者已經被取消,直接返回
//狀態如果是NEW,則嘗試把當前執行線程保存在runner字段中
//如果賦值失敗則直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執行任務
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//任務異常
setException(ex);
}
if (ran)
//任務正常執行完畢
set(result);
}
} finally {
runner = null;
int s = state;
//如果任務被中斷,執行中斷處理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
代碼的整體邏輯為:
- 判斷當前任務的state是否等于NEW,如果不為NEW則說明任務或者已經執行過,或者已經被取消,直接返回;
- 如果狀態為NEW則接著會通過unsafe類把任務執行線程引用CAS的保存在runner字段中,如果保存失敗,則直接返回;
- 執行任務;如果任務執行發生異常,則調用setException()方法保存異常信息。
FutureTask類的runAndReset方法
方法的源碼如下所示:
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
FutureTask類的runAndReset方法與run方法的邏輯基本相同,只是runAndReset方法會重置當前任務的執行狀態。
ScheduledThreadPoolExecutor類的reExecutePeriodic方法
reExecutePeriodic重復執行任務方法,源代碼如下所示:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//線程池當前狀態下能夠執行任務
if (canRunInCurrentRunState(true)) {
//與ThreadPoolExecutor不同,這里直接把任務加入延遲隊列
super.getQueue().add(task);//使用用的DelayedWorkQueue
//線程池當前狀態下不能執行任務,并且成功移除任務
if (!canRunInCurrentRunState(true) && remove(task))
//取消任務
task.cancel(false);
else
//這里是增加一個worker線程,避免提交的任務沒有worker去執行
//原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列
ensurePrestart();
}
}
總體來說reExecutePeriodic方法的邏輯比較簡單,需要注意的是:調用reExecutePeriodic方法的時候已經執行過一次任務,所以,并不會觸發線程池的拒絕策略;傳入reExecutePeriodic方法的任務一定是周期性的任務。
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己實現阻塞的工作隊列,是因為 ScheduleThreadPoolExecutor 要求的工作隊列有些特殊。
DelayedWorkQueue是一個基于堆的數據結構,類似于DelayQueue和PriorityQueue。在執行定時任務的時候,每個任務的執行時間都不同,所以DelayedWorkQueue的工作就是按照執行時間的升序來排列,執行時間距離當前時間越近的任務在隊列的前面(注意:這里的順序并不是絕對的,堆中的排序只保證了子節點的下次執行時間要比父節點的下次執行時間要大,而葉子節點之間并不一定是順序的)。
堆結構如下圖:
可見,DelayedWorkQueue是一個基于最小堆結構的隊列。堆結構可以使用數組表示,可以轉換成如下的數組:
在這種結構中,可以發現有如下特性: 假設“第一個元素” 在數組中的索引為 0 的話,則父結點和子結點的位置關系如下:
- 索引為 的左孩子的索引是 (2∗i+1);
- 索引為 的右孩子的索引是 (2∗i+2);
- 索引為 的父結點的索引是 floor((i−1)/2);
為什么要使用DelayedWorkQueue呢?
- 定時任務執行時需要取出最近要執行的任務,所以任務在隊列中每次出隊時一定要是當前隊列中執行時間最靠前的,所以自然要使用優先級隊列。
- DelayedWorkQueue是一個優先級隊列,它可以保證每次出隊的任務都是當前隊列中執行時間最靠前的,由于它是基于堆結構的隊列,堆結構在執行插入和刪除操作時的最壞時間復雜度是 O(logN)。
因為前面講阻塞隊列實現的時候,已經對DelayedWorkQueue進行了說明,更多內容請查看《阻塞隊列 — DelayedWorkQueue源碼分析》
總結
- 與Timer執行定時任務比較,相比Timer,ScheduledThreadPoolExecutor有說明優點?(文章前面分析過)
- ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個線程池,也有 coorPoolSize 和 workQueue,但是 ScheduledThreadPoolExecutor特殊的地方在于,自己實現了優先工作隊列 DelayedWorkQueue ;
- ScheduledThreadPoolExecutor 實現了 ScheduledExecutorService,所以就有了任務調度的方法,如 schedule 、 scheduleAtFixedRate 、 scheduleWithFixedDelay ,同時注意他們之間的區別;
- 內部類 ScheduledFutureTask 繼承者FutureTask,實現了任務的異步執行并且可以獲取返回結果。同時實現了Delayed接口,可以通過getDelay方法獲取將要執行的時間間隔;
- 周期任務的執行其實是調用了FutureTask的 runAndReset 方法,每次執行完不設置結果和狀態。
- DelayedWorkQueue的數據結構,它是一個基于最小堆結構的優先隊列,并且每次出隊時能夠保證取出的任務是當前隊列中下次執行時間最小的任務。同時注意一下優先隊列中堆的順序,堆中的順序并不是絕對的,但要保證子節點的值要比父節點的值要大,這樣就不會影響出隊的順序。
總體來說,ScheduedThreadPoolExecutor的重點是要理解下次執行時間的計算,以及優先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關鍵。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git
文章持續更新,可以公眾號搜一搜「 一角錢技術 」第一時間閱讀, 本文 GitHub org_hejianhui/JAVAStudy 已經收錄,歡迎 Star。