時(shí)間輪是一個(gè)高性能、低消耗的數(shù)據(jù)結(jié)構(gòu),它適合用非準(zhǔn)實(shí)時(shí),延遲的短平快任務(wù),例如心跳檢測。?.NETty、Kafka、Zookeeper中都有使用。
時(shí)間輪可通過時(shí)間與任務(wù)存儲(chǔ)分離的形式,輕松實(shí)現(xiàn)百億級(jí)海量任務(wù)調(diào)度。
Netty中的時(shí)間輪
Netty動(dòng)輒管理100w+的連接,每一個(gè)連接都會(huì)有很多超時(shí)任務(wù)。比如發(fā)送超時(shí)、心跳檢測間隔等,如果每一個(gè)定時(shí)任務(wù)都啟動(dòng)一個(gè)Timer,不僅低效,而且會(huì)消耗大量的資源。
時(shí)間輪 |
時(shí)間輪的格子 |
格子里的任務(wù) |
時(shí)間輪運(yùn)轉(zhuǎn)線程 |
HashedWheelTimer |
HashedWheelBucket |
HashedWheelTimeout |
Worker |
其他一些屬性:
時(shí)間輪零點(diǎn)時(shí)間:startTime
當(dāng)前指針?biāo)父褡樱簍ick
格子長度(持續(xù)時(shí)間):tickDuration
時(shí)間輪運(yùn)轉(zhuǎn)輪次、回合:remainingRounds
任務(wù)截止時(shí)間、觸發(fā)時(shí)間(相對時(shí)間輪的startTime):deadline
概括時(shí)間輪工作流程
(閱讀Netty3.10.6)
1、時(shí)間輪的啟動(dòng)并不是在構(gòu)造函數(shù)中,而是在第一次提交任務(wù)的時(shí)候newTimeout()
2、啟動(dòng)時(shí)間輪第一件事就是初始化時(shí)間輪的零點(diǎn)時(shí)間startTime,以后時(shí)間輪上的任務(wù)、格子觸發(fā)時(shí)間計(jì)算都相對這個(gè)時(shí)間
3、隨著時(shí)間的推移第一個(gè)格子(tick)觸發(fā),在觸發(fā)每個(gè)格子之前都是處于阻塞狀態(tài),并不是直接去處理這個(gè)格子的所有任務(wù),而是先從任務(wù)隊(duì)列timeouts中拉取最多100000個(gè)任務(wù),根據(jù)每個(gè)任務(wù)的觸發(fā)時(shí)間deadline放在不同的格子里(注意,Netty中會(huì)對時(shí)間輪上的每一個(gè)格子進(jìn)行處理,即使這個(gè)格子沒有任務(wù))
4、時(shí)間輪運(yùn)轉(zhuǎn)過程中維護(hù)著一個(gè)指針tick,根據(jù)當(dāng)前指針獲取對應(yīng)的格子里的所有任務(wù)進(jìn)行處理
5、任務(wù)自身維護(hù)了一個(gè)剩余回合(remainingRounds),代表任務(wù)在哪一輪執(zhí)行處理,只有該值為0時(shí)才進(jìn)行處理
代碼做了刪減,只體現(xiàn)重點(diǎn)
時(shí)間輪構(gòu)造器:
初始化了時(shí)間輪大小、每個(gè)格子大小、時(shí)間輪運(yùn)轉(zhuǎn)線程
public HashedWheelTimer(
ThreadFactory threadFactory,
ThreadNameDeterminer determiner,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
// TODO : 創(chuàng)建時(shí)間輪底層存儲(chǔ)任務(wù)的數(shù)據(jù)結(jié)構(gòu)
wheel = createWheel(ticksPerWheel);
// TODO : 求某一個(gè)任務(wù)落到哪個(gè)格子時(shí)需要用到的編碼
mask = wheel.length - 1;
// TODO : 每個(gè)格子的時(shí)間
this.tickDuration = unit.toNanos(tickDuration);
// TODO : 時(shí)間輪處理任務(wù)的線程
workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
worker, "Hashed wheel timer #" + id.incrementAndGet(),
determiner));
}
// TODO : 時(shí)間輪真正存儲(chǔ)數(shù)據(jù)的容器
private final HashedWheelBucket[] wheel;
// TODO : 存放任務(wù)的隊(duì)列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();
外界提交任務(wù)的時(shí)候,代碼如下
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// TODO : 啟動(dòng)時(shí)間輪運(yùn)轉(zhuǎn)線程
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// TODO : 任務(wù)放入到隊(duì)列中,并沒有一開始就放到時(shí)間輪上
timeouts.add(timeout);
return timeout;
}
時(shí)間輪運(yùn)轉(zhuǎn)執(zhí)行任務(wù),代碼如下
public void run() {
// TODO : 初始化時(shí)間輪的
startTime = System.nanoTime();
do {
// TODO : 這個(gè)方法會(huì)阻塞,隨著時(shí)間的推動(dòng)會(huì)觸發(fā)新的任務(wù)(tick),返回當(dāng)前時(shí)間
final long deadline = waitForNextTick();
if (deadline > 0) {
// TODO : 將隊(duì)列中的任務(wù)最多取100000放到時(shí)間輪上
transferTimeoutsToBuckets();
// TODO : 獲取當(dāng)前格子
HashedWheelBucket bucket = wheel[(int) (tick & mask)];
// TODO : 執(zhí)行時(shí)間輪上當(dāng)前格子上的任務(wù)
bucket.expireTimeouts(deadline);
// TODO : 指針走動(dòng)
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
run內(nèi)部方法解析
waitForNextTick等待下一個(gè)格子觸發(fā),代碼如下
private long waitForNextTick() {
// TODO : 截止時(shí)間、觸發(fā)時(shí)間
// TODO : 獲取當(dāng)前格子的觸發(fā)時(shí)間,因?yàn)闀r(shí)間輪底層是使用數(shù)組存儲(chǔ)任務(wù)數(shù)據(jù),所以tick需要+1
long deadline = tickDuration * (tick + 1);
/**
* tick : 時(shí)間輪上的格子
* tickDuration : 每個(gè)格子的長度,持續(xù)時(shí)間
* deadline : 這里表示下一個(gè)格子的觸發(fā)時(shí)間(觸發(fā)一個(gè)格子的任務(wù))相對時(shí)間輪起點(diǎn)時(shí)間(startTime)的時(shí)長
*/
for (;;) {
// TODO : 相對時(shí)間輪起點(diǎn)的當(dāng)前時(shí)間
final long currentTime = System.nanoTime() - startTime;
// TODO : 當(dāng)當(dāng)前時(shí)間大于等于deadline的時(shí)候,就會(huì)跳出循環(huán)
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
try {
// TODO : 并不是一直循環(huán)
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
transferTimeoutsToBuckets將隊(duì)列中任務(wù)存儲(chǔ)到時(shí)間輪上,代碼如下
private void transferTimeoutsToBuckets() {
for (int i = 0; i < 100000; i++) {
// TODO : 從隊(duì)列中取出任務(wù)
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed 已全部處理
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
// 期間被取消。所以只需從隊(duì)列中刪除它并繼續(xù)下一個(gè) HashedWheelTimeout
timeout.remove();
continue;
}
// TODO : 計(jì)算這個(gè)任務(wù)要走多少個(gè)格子
long calculated = timeout.deadline / tickDuration;
// TODO : 計(jì)算觸發(fā)當(dāng)前這個(gè)任務(wù)還要走多少輪,剩余回合!
/**
* calculated:觸發(fā)該任務(wù)一共要走的格子數(shù)
* tick:當(dāng)前已經(jīng)走的格子數(shù)
* wheel.length:時(shí)間輪的長度
*/
long remainingRounds = (calculated - tick) / wheel.length;
// TODO : 任務(wù)自身攜帶了觸發(fā)自己的輪次
timeout.remainingRounds = remainingRounds;
final long ticks = Math.max(calculated, tick);
// TODO : mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);
// TODO : 將任務(wù)放到時(shí)間輪的對應(yīng)格子中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
expireTimeouts執(zhí)行處理任務(wù),代碼如下
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
while (timeout != null) {
boolean remove = false;
// TODO : 根據(jù)剩余回合判斷是否要處理該任務(wù),如果大于0說明還沒輪到該任務(wù)
if (timeout.remainingRounds <= 0) {
// TODO : 如果時(shí)間已經(jīng)到了,則執(zhí)行任務(wù)
/**
* deadline 是相對時(shí)間輪startTime的當(dāng)前時(shí)間,也是當(dāng)前格子的觸發(fā)時(shí)間
* timeout.deadline 是任務(wù)的觸發(fā)時(shí)間
*/
if (timeout.deadline <= deadline) {
// TODO :
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never hAppen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
remove = true;
} else if (timeout.isCancelled()) {
remove = true;
} else {
timeout.remainingRounds --;
}
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove) {
remove(timeout);
}
timeout = next;
}
}
Kafka中的時(shí)間輪
Produce 時(shí)等待 ISR 副本復(fù)制成功、延遲刪除主題、會(huì)話超時(shí)檢查、延遲創(chuàng)建主題或分區(qū)等,會(huì)被封裝成不同的 DelayOperation 進(jìn)行延遲處理操作,防止阻塞 Kafka請求處理線程。
名稱 |
時(shí)間輪 |
時(shí)間輪的格子(桶) |
格子(桶)里的任務(wù) |
時(shí)間輪運(yùn)轉(zhuǎn)線程 |
處理過期任務(wù)線程 |
類名 |
TimingWheel |
TimerTaskList |
TimerTaskEntry |
ShutdownableThread |
ExecutorService |
屬性名 |
timingWheel |
bucket |
rootheadtail |
expirationReaper |
taskExecutor |
其他一些屬性:
時(shí)間輪零點(diǎn)時(shí)間:startMs
當(dāng)前時(shí)間:currentTime
格子長度(持續(xù)時(shí)間):tickMs
時(shí)間輪大小:wheelSize
時(shí)間輪的當(dāng)前層時(shí)間跨度:interval = tickMs * wheelSize
到期時(shí)間:expiration
溢出輪、升層的時(shí)間輪:overflowWheel: TimingWheel
概括時(shí)間輪工作流程
(閱讀Kafka-3.1.0)
Kafka 中的時(shí)間輪(TimingWheel)是一個(gè)存儲(chǔ)定時(shí)任務(wù)的環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表(TimerTaskList)。TimerTaskList是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng)(TimerTaskEntry),其中封裝了真正的定時(shí)任務(wù)(TimerTask)。
1、Kafka啟動(dòng)的時(shí)候就啟動(dòng)了時(shí)間輪
2、ExpiredOperationReaper.doWork() 循環(huán)執(zhí)行,首先從全局的delayQueue中獲取一個(gè)bucket,如果不為空則上鎖處理
3、根據(jù)bucket的到期時(shí)間嘗試推進(jìn),然后會(huì)刷一次bucket中的所有任務(wù),這些任務(wù)要么是需要立即執(zhí)行的(即到期時(shí)間在 currentTime 和 currentTime + tickMs 之間),要么是需要換桶的,往前移位(即到期時(shí)間大于等于 currentTime + tickMs);立即計(jì)算的直接提交給專門的線程處理
4、最后拉取delayQueue中下一個(gè)bucket處理,一直循環(huán)下去
5、添加一個(gè)任務(wù),首先是根據(jù)任務(wù)的到期時(shí)間expiration來判斷自己會(huì)落到哪一個(gè)bucket,如果expiration不小于currentTime + tickMs,則可能是當(dāng)前時(shí)間輪的任一個(gè)bucket,也可能是溢出輪中的任一個(gè)bucket
6、當(dāng)任務(wù)添加到某一個(gè)bucket后會(huì)判斷是否跟新了桶的到期時(shí)間,如果更新了則需要入隊(duì)處理delayQueue.offer
代碼做了刪減,只體現(xiàn)重點(diǎn)
1、Kafka中自己封裝了一個(gè)可關(guān)閉的線程類 Shutdown’able’Thread ,也就是實(shí)現(xiàn)了該類的 ExpiredOperationReaper 內(nèi)部實(shí)現(xiàn)了 doWork() 方法,維護(hù)著時(shí)間輪的運(yùn)轉(zhuǎn)
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {
override def doWork(): Unit = {
advanceClock(200L)
}
}
2、推進(jìn)時(shí)鐘的內(nèi)部實(shí)現(xiàn)
def advanceClock(timeoutMs: Long): Boolean = {
// TODO : 阻塞 timeoutMs = 200 毫秒,拉取一個(gè)桶:有直接返回,沒有則阻塞200毫秒
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
// TODO : 傳入當(dāng)前桶的過期時(shí)間,嘗試推進(jìn)時(shí)間
timingWheel.advanceClock(bucket.getExpiration)
// TODO : 無論推進(jìn)時(shí)間是否成功,當(dāng)前桶的這些任務(wù)要么是需要立即執(zhí)行的(即到期時(shí)間在 currentTime 和 currentTime + tickMs 之間),
// 要么是需要換桶的,往前移位(即到期時(shí)間大于等于 currentTime + tickMs);立即計(jì)算的直接提交給專門的線程處理
bucket.flush(addTimerTaskEntry)
// TODO : 進(jìn)行下一個(gè)桶處理
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
3、嘗試推進(jìn)時(shí)鐘
def advanceClock(timeMs: Long): Unit = {
/**
* currentTime + tickMs :當(dāng)前桶過期時(shí)間的截止時(shí)間
* timeMs :下一個(gè)桶的過期時(shí)間
*/
if (timeMs >= currentTime + tickMs) {
// currentTime 是 tickMs 的整數(shù)倍
currentTime = timeMs - (timeMs % tickMs)
// TODO : 嘗試推進(jìn)溢出輪的時(shí)間
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
4、bucket.flush(addTimerTaskEntry) 傳入的是一個(gè)方法之后桶內(nèi)的每一個(gè)任務(wù)都會(huì)走一次該方法
// TODO : 添加或處理任務(wù)
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
// TODO : 只有到期時(shí)間在 currentTime 和 currentTime + tickMs 之間的任務(wù)才會(huì)被直接處理
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled) {
// TODO : 只處理過期時(shí)間到達(dá)且不是被取消的任務(wù)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
}
5、添加任務(wù)到時(shí)間輪的入口也是地4步的方法,其中timingWheel.add(timerTaskEntry) 方法中會(huì)判斷每一個(gè)任務(wù)是立即處理還是入隊(duì)
/**
* 添加一個(gè)任務(wù)
* 添加任務(wù)的過程比較復(fù)雜,首先是根據(jù)任務(wù)的到期時(shí)間來判斷自己會(huì)落到哪一個(gè)bucket,可能是當(dāng)前時(shí)間輪任一個(gè)bucket,也可能是溢出輪中的任一個(gè)bucket
*/
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
// TODO : 任務(wù)到期時(shí)間
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
false
} else if (expiration < currentTime + tickMs) {
// TODO : 距離該任務(wù)到期僅剩最多 tickMs 毫秒了
// TODO : currentTime當(dāng)前指向的時(shí)間格也屬于到期部分,表示剛好到期
false
} else if (expiration < currentTime + interval) {
// TODO : 距離該任務(wù)到期小于一整輪的時(shí)間,大于一個(gè)格子的時(shí)間,說明它就在當(dāng)前層,不需要升層
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// TODO : 如果該任務(wù)的到來改變了他所進(jìn)入的桶的過期時(shí)間,即輪子已經(jīng)前進(jìn)并且之前的桶被重用了
// TODO : 桶是同一個(gè)桶,但是數(shù)據(jù)可能不是同一輪的,這時(shí)需要重新入隊(duì) DelayQueue
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// TODO : 需要升層 過期時(shí)間超過了 interval
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
需要升層的情況:其實(shí)每一個(gè)時(shí)間輪對象內(nèi)都有一個(gè)溢出輪的指針 overflowWheel ,他會(huì)指向父級(jí)時(shí)間輪。
Kafka 使用時(shí)間輪來實(shí)現(xiàn)延時(shí)隊(duì)列,因?yàn)槠涞讓邮侨蝿?wù)的添加和刪除是基于鏈表實(shí)現(xiàn)的,是 O(1) 的時(shí)間復(fù)雜度,滿足高性能的要求;
對于時(shí)間跨度大的延時(shí)任務(wù),Kafka 引入了層級(jí)時(shí)間輪,能更好控制時(shí)間粒度,可以應(yīng)對更加復(fù)雜的定時(shí)任務(wù)處理場景;
對于如何實(shí)現(xiàn)時(shí)間輪的推進(jìn)和避免空推進(jìn)影響性能,Kafka 采用空間換時(shí)間的思想,通過 DelayQueue 來推進(jìn)時(shí)間輪,算是一個(gè)經(jīng)典的 trade off。