前言
我在 2. SOFAJRaft源碼分析—JRaft的定時(shí)任務(wù)調(diào)度器是怎么做的? 這篇文章里已經(jīng)講解過(guò)時(shí)間輪算法在JRaft中是怎么應(yīng)用的,但是我感覺(jué)我并沒(méi)有講解清楚這個(gè)東西,導(dǎo)致看了這篇文章依然和沒(méi)看是一樣的,所以我打算重新說(shuō)透時(shí)間輪算法。
時(shí)間輪的應(yīng)用并非 JRaft 獨(dú)有,其應(yīng)用場(chǎng)景還有很多,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等組件中都存在時(shí)間輪的蹤影。
我們下面講解的時(shí)間輪的實(shí)現(xiàn)以JRaft中的為例子進(jìn)行講解,因?yàn)镴Raft這部分的代碼是參考Netty的,所以大家也可以去Netty中去尋找源碼實(shí)現(xiàn)。
時(shí)間輪用來(lái)解決什么問(wèn)題?
如果一個(gè)系統(tǒng)中存在著大量的調(diào)度任務(wù),而大量的調(diào)度任務(wù)如果每一個(gè)都使用自己的調(diào)度器來(lái)管理任務(wù)的生命周期的話,浪費(fèi)cpu的資源并且很低效。
時(shí)間輪是一種高效來(lái)利用線程資源來(lái)進(jìn)行批量化調(diào)度的一種調(diào)度模型。把大批量的調(diào)度任務(wù)全部都綁定到同一個(gè)的調(diào)度器上面,使用這一個(gè)調(diào)度器來(lái)進(jìn)行所有任務(wù)的管理(manager),觸發(fā)(trigger)以及運(yùn)行(runnable)。能夠高效的管理各種延時(shí)任務(wù),周期任務(wù),通知任務(wù)等等。
不過(guò),時(shí)間輪調(diào)度器的時(shí)間精度可能不是很高,對(duì)于精度要求特別高的調(diào)度任務(wù)可能不太適合。因?yàn)闀r(shí)間輪算法的精度取決于,時(shí)間段“指針”單元的最小粒度大小,比如時(shí)間輪的格子是一秒跳一次,那么調(diào)度精度小于一秒的任務(wù)就無(wú)法被時(shí)間輪所調(diào)度。
時(shí)間輪結(jié)構(gòu)
如圖,JRaft中時(shí)間輪(HashedWheelTimer)是一個(gè)存儲(chǔ)定時(shí)任務(wù)的環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表(HashedWheelBucket),HashedWheelBucket是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng)(HashedWheelTimeout),其中封裝了真正的定時(shí)任務(wù)(TimerTask)。
時(shí)間輪由多個(gè)時(shí)間格組成,每個(gè)時(shí)間格代表當(dāng)前時(shí)間輪的基本時(shí)間跨度(tickDuration)。時(shí)間輪的時(shí)間格個(gè)數(shù)是固定的,可用 wheel.length 來(lái)表示。
時(shí)間輪還有一個(gè)表盤(pán)指針(tick),用來(lái)表示時(shí)間輪當(dāng)前指針跳動(dòng)的次數(shù),可以用tickDuration * (tick + 1)來(lái)表示下一次到期的任務(wù),需要處理此時(shí)間格所對(duì)應(yīng)的 HashedWheelBucket 中的所有任務(wù)。
時(shí)間輪運(yùn)行邏輯
時(shí)間輪在啟動(dòng)的時(shí)候會(huì)記錄一下當(dāng)前啟動(dòng)的時(shí)間賦值給startTime。時(shí)間輪在添加任務(wù)的時(shí)候首先會(huì)計(jì)算延遲時(shí)間(deadline),比如一個(gè)任務(wù)的延遲時(shí)間為24ms,那么會(huì)將當(dāng)前的時(shí)間(currentTime)+24ms-時(shí)間輪啟動(dòng)時(shí)的時(shí)間(startTime)。然后將任務(wù)封裝成HashedWheelTimeout加入到timeouts隊(duì)列中,作為緩存。
時(shí)間輪在運(yùn)行的時(shí)候會(huì)將timeouts中緩存的HashedWheelTimeout任務(wù)取10萬(wàn)個(gè)出來(lái)進(jìn)行遍歷。
然后需要計(jì)算出幾個(gè)參數(shù)值:
- HashedWheelTimeout的總共延遲的次數(shù):將每個(gè)任務(wù)的延遲時(shí)間(deadline)/tickDuration 計(jì)算出tick需要總共跳動(dòng)的次數(shù);
- 計(jì)算時(shí)間輪round次數(shù):根據(jù)計(jì)算的需要走的(總次數(shù)- 當(dāng)前tick數(shù)量)/ 時(shí)間格個(gè)數(shù)(wheel.length)。比如tickDuration為1ms,時(shí)間格個(gè)數(shù)為20個(gè),那么時(shí)間輪走一圈需要20ms,那么添加進(jìn)一個(gè)延時(shí)為24ms的數(shù)據(jù),如果當(dāng)前的tick為0,那么計(jì)算出的輪數(shù)為1,指針沒(méi)運(yùn)行一圈就會(huì)將round取出來(lái)減一,所以需要轉(zhuǎn)動(dòng)到第二輪之后才可以將輪數(shù)round減為0之后才會(huì)運(yùn)行
- 計(jì)算出該任務(wù)需要放置到時(shí)間輪(wheel)的槽位,然后加入到槽位鏈表最后
將timeouts中的數(shù)據(jù)放置到時(shí)間輪wheel中之后,計(jì)算出當(dāng)前時(shí)針走到的槽位的位置,并取出槽位中的鏈表數(shù)據(jù),將deadline和當(dāng)前的時(shí)間做對(duì)比,運(yùn)行過(guò)期的數(shù)據(jù)。
源碼分析
構(gòu)造器
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } //unit = MILLISECONDS if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 創(chuàng)建一個(gè)HashedWheelBucket數(shù)組 // 創(chuàng)建時(shí)間輪基本的數(shù)據(jù)結(jié)構(gòu),一個(gè)數(shù)組。長(zhǎng)度為不小于ticksPerWheel的最小2的n次方 wheel = createWheel(ticksPerWheel); // 這是一個(gè)標(biāo)示符,用來(lái)快速計(jì)算任務(wù)應(yīng)該呆的格子。 // 我們知道,給定一個(gè)deadline的定時(shí)任務(wù),其應(yīng)該呆的格子=deadline%wheel.length.但是%操作是個(gè)相對(duì)耗時(shí)的操作,所以使用一種變通的位運(yùn)算代替: // 因?yàn)橐蝗Φ拈L(zhǎng)度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline&mast == deadline%wheel.length // JAVA中的HashMap在進(jìn)行hash之后,進(jìn)行index的hash尋址尋址的算法也是和這個(gè)一樣的 mask = wheel.length - 1; // Convert tickDuration to nanos. //tickDuration傳入是1的話,這里會(huì)轉(zhuǎn)換成1000000 this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. // 校驗(yàn)是否存在溢出。即指針轉(zhuǎn)動(dòng)的時(shí)間間隔不能太長(zhǎng)而導(dǎo)致tickDuration*wheel.length>Long.MAX_VALUE if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //將worker包裝成thread workerThread = threadFactory.newThread(worker); //maxPendingTimeouts = -1 this.maxPendingTimeouts = maxPendingTimeouts; //如果HashedWheelTimer實(shí)例太多,那么就會(huì)打印一個(gè)error日志 if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT && warnedTooManyInstances.compareAndSet(false, true)) { reportTooManyInstances(); }}
在這個(gè)構(gòu)造器中有幾個(gè)細(xì)節(jié)需要注意:
- 調(diào)用createWheel方法創(chuàng)建的wheel數(shù)組一定是2次方數(shù),比如傳入的ticksPerWheel是6,那么初始化的wheel長(zhǎng)度一定是8。這樣做是為了讓mask & tick 來(lái)計(jì)算出槽位
- tickDuration用的是納秒
- 在構(gòu)造里面并不會(huì)里面啟動(dòng)時(shí)間輪,而是要等到有第一個(gè)任務(wù)加入到時(shí)間輪的時(shí)候才啟動(dòng)。在構(gòu)造器里面會(huì)將工作線程worker封裝成workerThread
放入任務(wù)到時(shí)間輪中
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 如果時(shí)間輪沒(méi)有啟動(dòng),則啟動(dòng) start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. //在delay為正數(shù)的情況下,deadline是不可能為負(fù)數(shù) //如果為負(fù)數(shù),那么說(shuō)明超過(guò)了long的最大值 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 這里定時(shí)任務(wù)不是直接加到對(duì)應(yīng)的格子中,而是先加入到一個(gè)隊(duì)列里,然后等到下一個(gè)tick的時(shí)候, // 會(huì)從隊(duì)列里取出最多100000個(gè)任務(wù)加入到指定的格子中 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //Worker會(huì)去處理timeouts隊(duì)列里面的數(shù)據(jù) timeouts.add(timeout); return timeout;}
- 如果時(shí)間輪沒(méi)有啟動(dòng),那么就調(diào)用start方法啟動(dòng)時(shí)間輪,啟動(dòng)時(shí)間輪之后會(huì)為startTime設(shè)置為當(dāng)前時(shí)間
- 計(jì)算延遲時(shí)間deadline
- 將task任務(wù)封裝到HashedWheelTimeout中,然后添加到timeouts隊(duì)列中進(jìn)行緩存
start
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);public void start() { //workerState一開(kāi)始的時(shí)候是0(WORKER_STATE_INIT),然后才會(huì)設(shè)置為1(WORKER_STATE_STARTED) switch (workerStateUpdater.get(this)) { case WORKER_STATE_INIT: //使用cas來(lái)獲取啟動(dòng)調(diào)度的權(quán)力,只有競(jìng)爭(zhēng)到的線程允許來(lái)進(jìn)行實(shí)例啟動(dòng) if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //如果成功設(shè)置了workerState,那么就調(diào)用workerThread線程 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // 等待worker線程初始化時(shí)間輪的啟動(dòng)時(shí)間 // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { //這里使用countDownLauch來(lái)確保調(diào)度的線程已經(jīng)被啟動(dòng) startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } }}
start方法會(huì)根據(jù)當(dāng)前的workerState狀態(tài)來(lái)啟動(dòng)時(shí)間輪。并且用了startTimeInitialized來(lái)控制線程的運(yùn)行,如果workerThread沒(méi)有啟動(dòng)起來(lái),那么newTimeout方法會(huì)一直阻塞在運(yùn)行start方法中。如果不阻塞,newTimeout方法會(huì)獲取不到startTime。
啟動(dòng)時(shí)間輪
時(shí)間輪的啟動(dòng)在HashedWheelTimer的內(nèi)部類Worker中。調(diào)用workerThread#start方法會(huì)調(diào)用Worker的run方法啟動(dòng)時(shí)間輪。
下面我們看時(shí)間輪啟動(dòng)做了什么,下面的分析不考慮任務(wù)被取消的情況。
Worker#run
public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } //HashedWheelTimer的start方法會(huì)繼續(xù)往下運(yùn)行 // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { //返回的是當(dāng)前的nanoTime- startTime //也就是返回的是 每 tick 一次的時(shí)間間隔 final long deadline = waitForNextTick(); if (deadline > 0) { //算出時(shí)間輪的槽位 int idx = (int) (tick & mask); //移除cancelledTimeouts中的bucket // 從bucket中移除timeout processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // 將newTimeout()方法中加入到待處理定時(shí)任務(wù)隊(duì)列中的任務(wù)加入到指定的格子中 transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } // 校驗(yàn)如果workerState是started狀態(tài),那么就一直循環(huán) } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } //如果有沒(méi)有被處理的timeout,那么加入到unprocessedTimeouts對(duì)列中 if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //處理被取消的任務(wù) processCancelledTasks();}
- 時(shí)間輪運(yùn)行的時(shí)候首先會(huì)記錄一下啟動(dòng)時(shí)間(startTime),然后調(diào)用startTimeInitialized釋放外層的等待線程;
- 進(jìn)入dowhile循環(huán),調(diào)用waitForNextTick睡眠等待到下一次的tick指針的跳動(dòng),并返回當(dāng)前時(shí)間減去startTime作為deadline
- 由于mask= wheel.length -1 ,wheel是2的次方數(shù),所以可以直接用tick & mask 計(jì)算出此次在wheel中的槽位
- 調(diào)用processCancelledTasks將cancelledTimeouts隊(duì)列中的任務(wù)取出來(lái),并將當(dāng)前的任務(wù)從時(shí)間輪中移除
- 調(diào)用transferTimeoutsToBuckets方法將timeouts隊(duì)列中緩存的數(shù)據(jù)取出加入到時(shí)間輪中
- 運(yùn)行目前指針指向的槽位中的bucket鏈表數(shù)據(jù)
時(shí)間輪指針跳動(dòng)
waitForNextTick
//sleep, 直到下次tick到來(lái), 然后返回該次tick和啟動(dòng)時(shí)間之間的時(shí)長(zhǎng)private long waitForNextTick() { //tickDuration這里是100000 //tick表示總tick數(shù) long deadline = tickDuration * (tick + 1); for (;;) { final long currentTime = System.nanoTime() - startTime; // 計(jì)算需要sleep的時(shí)間, 之所以加999999后再除10000000,前面是1所以這里需要減去1, // 才能計(jì)算準(zhǔn)確,還有通過(guò)這里可以看到 其實(shí)線程是以睡眠一定的時(shí)候再來(lái)執(zhí)行下一個(gè)ticket的任務(wù)的, //這樣如果ticket的間隔設(shè)置的太小的話,系統(tǒng)會(huì)頻繁的睡眠然后啟動(dòng), //其實(shí)感覺(jué)影響部分的性能,所以為了更好的利用系統(tǒng)資源步長(zhǎng)可以稍微設(shè)置大點(diǎn) long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //sleepTimeMs小于零表示走到了下一個(gè)時(shí)間輪位置 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (Platform.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } }}
可以想象一下在時(shí)鐘的秒鐘上面秒與秒之間的時(shí)間是需要等待的,那么waitForNextTick這個(gè)方法就是根據(jù)當(dāng)前的時(shí)間計(jì)算出跳動(dòng)到下個(gè)時(shí)間的間隔時(shí)間,并進(jìn)行sleep操作,然后返回當(dāng)前時(shí)間距離時(shí)間輪啟動(dòng)時(shí)間的時(shí)間段。
轉(zhuǎn)移任務(wù)到時(shí)間輪中
在調(diào)用時(shí)間輪的方法加入任務(wù)的時(shí)候并沒(méi)有直接加入到時(shí)間輪中,而是緩存到了timeouts隊(duì)列中,所以在運(yùn)行的時(shí)候需要將timeouts隊(duì)列中的任務(wù)轉(zhuǎn)移到時(shí)間輪數(shù)據(jù)的鏈表中
transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 每次tick只處理10w個(gè)任務(wù),以免阻塞worker線程 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } //已經(jīng)被取消了; if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } //calculated = tick 次數(shù) long calculated = timeout.deadline / tickDuration; // 計(jì)算剩余的輪數(shù), 只有 timer 走夠輪數(shù), 并且到達(dá)了 task 所在的 slot, task 才會(huì)過(guò)期 timeout.remainingRounds = (calculated - tick) / wheel.length; //如果任務(wù)在timeouts隊(duì)列里面放久了, 以至于已經(jīng)過(guò)了執(zhí)行時(shí)間, 這個(gè)時(shí)候就使用當(dāng)前tick, 也就是放到當(dāng)前bucket, 此方法調(diào)用完后就會(huì)被執(zhí)行 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. //// 算出任務(wù)應(yīng)該插入的 wheel 的 slot, slotIndex = tick 次數(shù) & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; //將timeout加入到bucket鏈表中 bucket.addTimeout(timeout); }}
在這個(gè)轉(zhuǎn)移方法中,寫(xiě)死了一個(gè)循環(huán),每次都只轉(zhuǎn)移10萬(wàn)個(gè)任務(wù)。
然后根據(jù)HashedWheelTimeout的deadline延遲時(shí)間計(jì)算出時(shí)間輪需要運(yùn)行多少次才能運(yùn)行當(dāng)前的任務(wù),如果當(dāng)前的任務(wù)延遲時(shí)間大于時(shí)間輪跑一圈所需要的時(shí)間,那么就計(jì)算需要跑幾圈才能到這個(gè)任務(wù)運(yùn)行。
最后計(jì)算出該任務(wù)在時(shí)間輪中的槽位,添加到時(shí)間輪的鏈表中。
運(yùn)行時(shí)間輪中的任務(wù)
當(dāng)指針跳到時(shí)間輪的槽位的時(shí)間,會(huì)將槽位的HashedWheelBucket取出來(lái),然后遍歷鏈表,運(yùn)行其中到期的任務(wù)。
expireTimeouts
// 過(guò)期并執(zhí)行格子中的到期任務(wù),tick到該格子的時(shí)候,worker線程會(huì)調(diào)用這個(gè)方法//根據(jù)deadline和remainingRounds判斷任務(wù)是否過(guò)期public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts //遍歷格子中的所有定時(shí)任務(wù) while (timeout != null) { // 先保存next,因?yàn)橐瞥髇ext將被設(shè)置為null HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { //從bucket鏈表中移除當(dāng)前timeout,并返回鏈表中下一個(gè)timeout next = remove(timeout); //如果timeout的時(shí)間小于當(dāng)前的時(shí)間,那么就調(diào)用expire執(zhí)行task if (timeout.deadline <= deadline) { timeout.expire(); } else { //不可能發(fā)生的情況,就是說(shuō)round已經(jīng)為0了,deadline卻>當(dāng)前槽的deadline // The timeout was placed into a wrong slot. This should never hAppen. throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { //因?yàn)楫?dāng)前的槽位已經(jīng)過(guò)了,說(shuō)明已經(jīng)走了一圈了,把輪數(shù)減一 timeout.remainingRounds--; } //把指針?lè)胖玫较乱粋€(gè)timeout timeout = next; }}
HashedWheelBucket是一個(gè)鏈表,所以我們需要從head節(jié)點(diǎn)往下進(jìn)行遍歷。如果鏈表沒(méi)有遍歷到鏈表尾部那么就繼續(xù)往下遍歷。
獲取的timeout節(jié)點(diǎn)節(jié)點(diǎn),如果剩余輪數(shù)remainingRounds大于0,那么就說(shuō)明要到下一圈才能運(yùn)行,所以將剩余輪數(shù)減一;
如果當(dāng)前剩余輪數(shù)小于等于零了,那么就將當(dāng)前節(jié)點(diǎn)從bucket鏈表中移除,并判斷一下當(dāng)前的時(shí)間是否大于timeout的延遲時(shí)間,如果是則調(diào)用timeout的expire執(zhí)行任務(wù)。