日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

時(shí)間輪是一個(gè)高性能、低消耗的數(shù)據(jù)結(jié)構(gòu),它適合用非準(zhǔn)實(shí)時(shí),延遲的短平快任務(wù),例如心跳檢測。?.NETty、Kafka、Zookeeper中都有使用。

時(shí)間輪可通過時(shí)間與任務(wù)存儲(chǔ)分離的形式,輕松實(shí)現(xiàn)百億級(jí)海量任務(wù)調(diào)度。

Netty中的時(shí)間輪

Netty動(dòng)輒管理100w+的連接,每一個(gè)連接都會(huì)有很多超時(shí)任務(wù)。比如發(fā)送超時(shí)、心跳檢測間隔等,如果每一個(gè)定時(shí)任務(wù)都啟動(dòng)一個(gè)Timer,不僅低效,而且會(huì)消耗大量的資源。

時(shí)間輪

時(shí)間輪的格子

格子里的任務(wù)

時(shí)間輪運(yùn)轉(zhuǎn)線程

HashedWheelTimer

HashedWheelBucket

HashedWheelTimeout

Worker

其他一些屬性:

時(shí)間輪零點(diǎn)時(shí)間:startTime
當(dāng)前指針?biāo)父褡樱簍ick
格子長度(持續(xù)時(shí)間):tickDuration
時(shí)間輪運(yùn)轉(zhuǎn)輪次、回合:remainingRounds
任務(wù)截止時(shí)間、觸發(fā)時(shí)間(相對時(shí)間輪的startTime):deadline

概括時(shí)間輪工作流程

(閱讀Netty3.10.6)

1、時(shí)間輪的啟動(dòng)并不是在構(gòu)造函數(shù)中,而是在第一次提交任務(wù)的時(shí)候newTimeout()
2、啟動(dòng)時(shí)間輪第一件事就是初始化時(shí)間輪的零點(diǎn)時(shí)間startTime,以后時(shí)間輪上的任務(wù)、格子觸發(fā)時(shí)間計(jì)算都相對這個(gè)時(shí)間
3、隨著時(shí)間的推移第一個(gè)格子(tick)觸發(fā),在觸發(fā)每個(gè)格子之前都是處于阻塞狀態(tài),并不是直接去處理這個(gè)格子的所有任務(wù),而是先從任務(wù)隊(duì)列timeouts中拉取最多100000個(gè)任務(wù),根據(jù)每個(gè)任務(wù)的觸發(fā)時(shí)間deadline放在不同的格子里(注意,Netty中會(huì)對時(shí)間輪上的每一個(gè)格子進(jìn)行處理,即使這個(gè)格子沒有任務(wù))
4、時(shí)間輪運(yùn)轉(zhuǎn)過程中維護(hù)著一個(gè)指針tick,根據(jù)當(dāng)前指針獲取對應(yīng)的格子里的所有任務(wù)進(jìn)行處理
5、任務(wù)自身維護(hù)了一個(gè)剩余回合(remainingRounds),代表任務(wù)在哪一輪執(zhí)行處理,只有該值為0時(shí)才進(jìn)行處理

代碼做了刪減,只體現(xiàn)重點(diǎn)

時(shí)間輪構(gòu)造器:

初始化了時(shí)間輪大小、每個(gè)格子大小、時(shí)間輪運(yùn)轉(zhuǎn)線程

public HashedWheelTimer(
    ThreadFactory threadFactory,
    ThreadNameDeterminer determiner,
    long tickDuration, TimeUnit unit, int ticksPerWheel) {
 

    // TODO : 創(chuàng)建時(shí)間輪底層存儲(chǔ)任務(wù)的數(shù)據(jù)結(jié)構(gòu)
    wheel = createWheel(ticksPerWheel);
    // TODO : 求某一個(gè)任務(wù)落到哪個(gè)格子時(shí)需要用到的編碼
    mask = wheel.length - 1;

    // TODO : 每個(gè)格子的時(shí)間
    this.tickDuration = unit.toNanos(tickDuration);

    // TODO : 時(shí)間輪處理任務(wù)的線程
    workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
        worker, "Hashed wheel timer #" + id.incrementAndGet(),
        determiner));
}
// TODO : 時(shí)間輪真正存儲(chǔ)數(shù)據(jù)的容器
private final HashedWheelBucket[] wheel;
// TODO : 存放任務(wù)的隊(duì)列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();

外界提交任務(wù)的時(shí)候,代碼如下

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
 
    // TODO : 啟動(dòng)時(shí)間輪運(yùn)轉(zhuǎn)線程
    start();

    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // TODO : 任務(wù)放入到隊(duì)列中,并沒有一開始就放到時(shí)間輪上
    timeouts.add(timeout);
    return timeout;
}

時(shí)間輪運(yùn)轉(zhuǎn)執(zhí)行任務(wù),代碼如下

public void run() {
 
    // TODO : 初始化時(shí)間輪的
    startTime = System.nanoTime();

    do {
 
        // TODO : 這個(gè)方法會(huì)阻塞,隨著時(shí)間的推動(dòng)會(huì)觸發(fā)新的任務(wù)(tick),返回當(dāng)前時(shí)間
        final long deadline = waitForNextTick();
        if (deadline > 0) {
 
            // TODO : 將隊(duì)列中的任務(wù)最多取100000放到時(shí)間輪上
            transferTimeoutsToBuckets();
            // TODO : 獲取當(dāng)前格子
            HashedWheelBucket bucket = wheel[(int) (tick & mask)];
            // TODO : 執(zhí)行時(shí)間輪上當(dāng)前格子上的任務(wù)
            bucket.expireTimeouts(deadline);
            // TODO : 指針走動(dòng)
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}

run內(nèi)部方法解析

waitForNextTick等待下一個(gè)格子觸發(fā),代碼如下

private long waitForNextTick() {
 
    // TODO : 截止時(shí)間、觸發(fā)時(shí)間
    // TODO : 獲取當(dāng)前格子的觸發(fā)時(shí)間,因?yàn)闀r(shí)間輪底層是使用數(shù)組存儲(chǔ)任務(wù)數(shù)據(jù),所以tick需要+1
    long deadline = tickDuration * (tick + 1);
    /**
             * tick : 時(shí)間輪上的格子
             * tickDuration : 每個(gè)格子的長度,持續(xù)時(shí)間
             * deadline : 這里表示下一個(gè)格子的觸發(fā)時(shí)間(觸發(fā)一個(gè)格子的任務(wù))相對時(shí)間輪起點(diǎn)時(shí)間(startTime)的時(shí)長
             */
    
    for (;;) {
 
        // TODO : 相對時(shí)間輪起點(diǎn)的當(dāng)前時(shí)間
        final long currentTime = System.nanoTime() - startTime;
        // TODO : 當(dāng)當(dāng)前時(shí)間大于等于deadline的時(shí)候,就會(huì)跳出循環(huán)
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        if (sleepTimeMs <= 0) {
 
            if (currentTime == Long.MIN_VALUE) {
 
                return -Long.MAX_VALUE;
            } else {
 
                return currentTime;
            }
        }
        try {
 
            // TODO : 并不是一直循環(huán)
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException e) {
 
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
 
                return Long.MIN_VALUE;
            }
        }
    }
}

transferTimeoutsToBuckets將隊(duì)列中任務(wù)存儲(chǔ)到時(shí)間輪上,代碼如下

private void transferTimeoutsToBuckets() {
 
    for (int i = 0; i < 100000; i++) {
 
        // TODO : 從隊(duì)列中取出任務(wù)
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
 
            // all processed 已全部處理
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
            || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
 
            // 期間被取消。所以只需從隊(duì)列中刪除它并繼續(xù)下一個(gè) HashedWheelTimeout
            timeout.remove();
            continue;
        }
        // TODO : 計(jì)算這個(gè)任務(wù)要走多少個(gè)格子
        long calculated = timeout.deadline / tickDuration;
        // TODO : 計(jì)算觸發(fā)當(dāng)前這個(gè)任務(wù)還要走多少輪,剩余回合!
        /**
                 * calculated:觸發(fā)該任務(wù)一共要走的格子數(shù)
                 * tick:當(dāng)前已經(jīng)走的格子數(shù)
                 * wheel.length:時(shí)間輪的長度
                 */
        long remainingRounds = (calculated - tick) / wheel.length;
        // TODO : 任務(wù)自身攜帶了觸發(fā)自己的輪次
        timeout.remainingRounds = remainingRounds;
        final long ticks = Math.max(calculated, tick); 
        // TODO : mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);

        // TODO : 將任務(wù)放到時(shí)間輪的對應(yīng)格子中
        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

expireTimeouts執(zhí)行處理任務(wù),代碼如下

public void expireTimeouts(long deadline) {
 
    HashedWheelTimeout timeout = head;

    while (timeout != null) {
 
        boolean remove = false;
        // TODO : 根據(jù)剩余回合判斷是否要處理該任務(wù),如果大于0說明還沒輪到該任務(wù)
        if (timeout.remainingRounds <= 0) {
 
            // TODO : 如果時(shí)間已經(jīng)到了,則執(zhí)行任務(wù)
            /**
                     * deadline 是相對時(shí)間輪startTime的當(dāng)前時(shí)間,也是當(dāng)前格子的觸發(fā)時(shí)間
                     * timeout.deadline 是任務(wù)的觸發(fā)時(shí)間
                     */
            if (timeout.deadline <= deadline) {
 
                // TODO :
                timeout.expire();
            } else {
 
                // The timeout was placed into a wrong slot. This should never hAppen.
                throw new IllegalStateException(String.format(
                    "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
            remove = true;
        } else if (timeout.isCancelled()) {
 
            remove = true;
        } else {
 
            timeout.remainingRounds --;
        }
        // store reference to next as we may null out timeout.next in the remove block.
        HashedWheelTimeout next = timeout.next;
        if (remove) {
 
            remove(timeout);
        }
        timeout = next;
    }
}

Kafka中的時(shí)間輪

Produce 時(shí)等待 ISR 副本復(fù)制成功、延遲刪除主題、會(huì)話超時(shí)檢查、延遲創(chuàng)建主題或分區(qū)等,會(huì)被封裝成不同的 DelayOperation 進(jìn)行延遲處理操作,防止阻塞 Kafka請求處理線程。

名稱

時(shí)間輪

時(shí)間輪的格子(桶)

格子(桶)里的任務(wù)

時(shí)間輪運(yùn)轉(zhuǎn)線程

處理過期任務(wù)線程

類名

TimingWheel

TimerTaskList

TimerTaskEntry

ShutdownableThread

ExecutorService

屬性名

timingWheel

bucket

rootheadtail

expirationReaper

taskExecutor

其他一些屬性:

時(shí)間輪零點(diǎn)時(shí)間:startMs
當(dāng)前時(shí)間:currentTime
格子長度(持續(xù)時(shí)間):tickMs
時(shí)間輪大小:wheelSize
時(shí)間輪的當(dāng)前層時(shí)間跨度:interval = tickMs * wheelSize
到期時(shí)間:expiration
溢出輪、升層的時(shí)間輪:overflowWheel: TimingWheel

概括時(shí)間輪工作流程

(閱讀Kafka-3.1.0)

Kafka 中的時(shí)間輪(TimingWheel)是一個(gè)存儲(chǔ)定時(shí)任務(wù)的環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表(TimerTaskList)。TimerTaskList是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng)(TimerTaskEntry),其中封裝了真正的定時(shí)任務(wù)(TimerTask)。

1、Kafka啟動(dòng)的時(shí)候就啟動(dòng)了時(shí)間輪
2、ExpiredOperationReaper.doWork() 循環(huán)執(zhí)行,首先從全局的delayQueue中獲取一個(gè)bucket,如果不為空則上鎖處理
3、根據(jù)bucket的到期時(shí)間嘗試推進(jìn),然后會(huì)刷一次bucket中的所有任務(wù),這些任務(wù)要么是需要立即執(zhí)行的(即到期時(shí)間在 currentTime 和 currentTime + tickMs 之間),要么是需要換桶的,往前移位(即到期時(shí)間大于等于 currentTime + tickMs);立即計(jì)算的直接提交給專門的線程處理
4、最后拉取delayQueue中下一個(gè)bucket處理,一直循環(huán)下去
5、添加一個(gè)任務(wù),首先是根據(jù)任務(wù)的到期時(shí)間expiration來判斷自己會(huì)落到哪一個(gè)bucket,如果expiration不小于currentTime + tickMs,則可能是當(dāng)前時(shí)間輪的任一個(gè)bucket,也可能是溢出輪中的任一個(gè)bucket
6、當(dāng)任務(wù)添加到某一個(gè)bucket后會(huì)判斷是否跟新了桶的到期時(shí)間,如果更新了則需要入隊(duì)處理delayQueue.offer

代碼做了刪減,只體現(xiàn)重點(diǎn)

1、Kafka中自己封裝了一個(gè)可關(guān)閉的線程類 Shutdown’able’Thread ,也就是實(shí)現(xiàn)了該類的 ExpiredOperationReaper 內(nèi)部實(shí)現(xiàn)了 doWork() 方法,維護(hù)著時(shí)間輪的運(yùn)轉(zhuǎn)

private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
    false) {
 

    override def doWork(): Unit = {
 
        advanceClock(200L)
    }
}

2、推進(jìn)時(shí)鐘的內(nèi)部實(shí)現(xiàn)

def advanceClock(timeoutMs: Long): Boolean = {
 
    // TODO : 阻塞 timeoutMs = 200 毫秒,拉取一個(gè)桶:有直接返回,沒有則阻塞200毫秒
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
        if (bucket != null) {
 
            writeLock.lock()
                try {
 
                    while (bucket != null) {
 
                        // TODO : 傳入當(dāng)前桶的過期時(shí)間,嘗試推進(jìn)時(shí)間
                        timingWheel.advanceClock(bucket.getExpiration)
                        // TODO : 無論推進(jìn)時(shí)間是否成功,當(dāng)前桶的這些任務(wù)要么是需要立即執(zhí)行的(即到期時(shí)間在 currentTime 和 currentTime + tickMs 之間),
                        //  要么是需要換桶的,往前移位(即到期時(shí)間大于等于 currentTime + tickMs);立即計(jì)算的直接提交給專門的線程處理
                        bucket.flush(addTimerTaskEntry)
                        // TODO : 進(jìn)行下一個(gè)桶處理
                        bucket = delayQueue.poll()
                    }
                } finally {
 
                    writeLock.unlock()
                }
            true
        } else {
 
            false
        }
}

3、嘗試推進(jìn)時(shí)鐘

def advanceClock(timeMs: Long): Unit = {
 
    /**
     * currentTime + tickMs :當(dāng)前桶過期時(shí)間的截止時(shí)間
     * timeMs :下一個(gè)桶的過期時(shí)間
     */
    if (timeMs >= currentTime + tickMs) {
 
      // currentTime 是 tickMs 的整數(shù)倍
      currentTime = timeMs - (timeMs % tickMs)
      // TODO : 嘗試推進(jìn)溢出輪的時(shí)間
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }

4、bucket.flush(addTimerTaskEntry) 傳入的是一個(gè)方法之后桶內(nèi)的每一個(gè)任務(wù)都會(huì)走一次該方法

// TODO : 添加或處理任務(wù)
  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
 
    // TODO : 只有到期時(shí)間在 currentTime 和 currentTime + tickMs 之間的任務(wù)才會(huì)被直接處理
    if (!timingWheel.add(timerTaskEntry)) {
 
      // Already expired or cancelled
      if (!timerTaskEntry.cancelled) {
 
        // TODO : 只處理過期時(shí)間到達(dá)且不是被取消的任務(wù)
        taskExecutor.submit(timerTaskEntry.timerTask)
      }
    }
  }

5、添加任務(wù)到時(shí)間輪的入口也是地4步的方法,其中timingWheel.add(timerTaskEntry) 方法中會(huì)判斷每一個(gè)任務(wù)是立即處理還是入隊(duì)

/**
   * 添加一個(gè)任務(wù)
   * 添加任務(wù)的過程比較復(fù)雜,首先是根據(jù)任務(wù)的到期時(shí)間來判斷自己會(huì)落到哪一個(gè)bucket,可能是當(dāng)前時(shí)間輪任一個(gè)bucket,也可能是溢出輪中的任一個(gè)bucket
   */
  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
 
    // TODO : 任務(wù)到期時(shí)間
    val expiration = timerTaskEntry.expirationMs
    if (timerTaskEntry.cancelled) {
 
      false
    } else if (expiration < currentTime + tickMs) {
 
      // TODO : 距離該任務(wù)到期僅剩最多 tickMs 毫秒了
      // TODO : currentTime當(dāng)前指向的時(shí)間格也屬于到期部分,表示剛好到期
      false
    } else if (expiration < currentTime + interval) {
 
      // TODO : 距離該任務(wù)到期小于一整輪的時(shí)間,大于一個(gè)格子的時(shí)間,說明它就在當(dāng)前層,不需要升層
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)
      // TODO : 如果該任務(wù)的到來改變了他所進(jìn)入的桶的過期時(shí)間,即輪子已經(jīng)前進(jìn)并且之前的桶被重用了
      // TODO : 桶是同一個(gè)桶,但是數(shù)據(jù)可能不是同一輪的,這時(shí)需要重新入隊(duì) DelayQueue
      if (bucket.setExpiration(virtualId * tickMs)) {
 
        queue.offer(bucket)
      }
      true
    } else {
 
      // TODO : 需要升層 過期時(shí)間超過了 interval
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

需要升層的情況:其實(shí)每一個(gè)時(shí)間輪對象內(nèi)都有一個(gè)溢出輪的指針 overflowWheel ,他會(huì)指向父級(jí)時(shí)間輪。

Kafka 使用時(shí)間輪來實(shí)現(xiàn)延時(shí)隊(duì)列,因?yàn)槠涞讓邮侨蝿?wù)的添加和刪除是基于鏈表實(shí)現(xiàn)的,是 O(1) 的時(shí)間復(fù)雜度,滿足高性能的要求;

對于時(shí)間跨度大的延時(shí)任務(wù),Kafka 引入了層級(jí)時(shí)間輪,能更好控制時(shí)間粒度,可以應(yīng)對更加復(fù)雜的定時(shí)任務(wù)處理場景;

對于如何實(shí)現(xiàn)時(shí)間輪的推進(jìn)和避免空推進(jìn)影響性能,Kafka 采用空間換時(shí)間的思想,通過 DelayQueue 來推進(jìn)時(shí)間輪,算是一個(gè)經(jīng)典的 trade off。

分享到:
標(biāo)簽:Netty
用戶無頭像

網(wǎng)友整理

注冊時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評(píng)定2018-06-03

通用課目體育訓(xùn)練成績評(píng)定