時間輪
基于排序鏈表的定時器(Linux定時器 - 定時的方法)存在一個問題:添加定時器的效率偏低。下面我們要討論的時間輪解決了這個問題。下圖是一種簡單的時間輪:
上圖時間輪內,(實現)指針指向輪子上的一個槽(slot)。它以恒定的速度順時針轉動,每轉動一步就指向下一個槽(虛線指針指向的槽),每次轉動稱為一個滴答(tick)。一個滴答的時間稱為時間輪的槽間隔si(slot interval),它實際上就是心跳時間。該時間輪共有N個槽,因此它運轉一周的時間是N*si。每個槽指向一條定時器鏈表,每條鏈表上的定時器具有相同的特征:它們的定時時間相差N*si的整數倍。時間輪正是利用這個關系將定時器散列到不同的鏈表中。假如現在指針指向槽cs,我們要添加一個定時時間為ti的定時器,則該定時器被插入的槽ts(time slot)對應的鏈表中:
ts = (cs + ti/si) % N
基于排序鏈表的定時器使用唯一的一條鏈表來管理所有定時器,所以插入操作的效率隨著定時器數目的增多而降低。而時間輪使用哈希表的思想,將定時器散列到不同的鏈表上。這樣每條鏈表上的定時器數目都將明顯少于原來的排序鏈表上的定時器數目,插入操作的效率基本不受定時器數目的影響。
很顯然,對時間輪而言,要提高定時精度,就要使si值足夠小;要提高執行效率,則要求N值足夠大。
上圖描述的是一種簡單的時間輪,因為它只有一個輪子。而復雜的時間輪可能有多個輪子,不同的輪子擁有不同的粒度。相鄰的兩個輪子,精度高的轉一圈,精度低的僅往前移動一槽,就像水表一樣。
下面的示例代碼展示了一個較為簡單的時間輪實現代碼:
#ifndef TIME_WHEEL_TIMER_H
#define TIME_WHEEL_TIMER_H
#include <time.h>
#include <netinet/in.h>
#include <stdio.h>
#define BUFFER_SIZE 64
class tw_timer;
/*解綁socket和定時器*/
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer* timer;
};
/*定時器類*/
class tw_timer
{
public:
tw_timer(int rot, int ts)
:next(NULL), prev(NULL), rotation(rot), time_slot(ts){}
public:
int rotation; /*記錄定時器在時間輪轉多少圈生效*/
int time_slot; /*記錄定時器屬于時間輪上哪個槽(對應的鏈表)*/
void (*cb_func)(client_data*); /*定時器回調函數*/
client_data* user_data; /*客戶數據*/
tw_timer* next;
tw_timer* prev;
};
class time_wheel
{
public:
time_wheel():cur_slot(0)
{
for(int i = 0; i < N; ++i){
slots[i] = NULL;
}
}
~time_wheel()
{
for(int i = 0; i < N; ++i)
{
tw_timer* tmp = slots[i];
while(tmp){
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
tw_timer* add_timer(int timeout)
{
if(timeout < 0){
return NULL;
}
int ticks = 0;
/*根據插入定時器的超時時間計算它將在時間輪轉動多少個滴答后被觸發
*并將該滴答數存于變量ticks。
*如果待輸入定時器的超時值小于時間輪的槽間隔SI,則將ticks向上折為1,
*否則就將ticks向下折為timeout/SI*/
if (timeout < SI){
ticks = 1;
}
else{
ticks = timeout / SI;
}
/*計算待插入的定時器在時間輪轉動多少圈后被觸發*/
int rotation = ticks/ N;
/*計算待插入的定時器應該被插入哪個槽中*/
int ts = (cur_slot + ticks%N) % N;
/*創建新的定時器,它在時間論轉動rotation圈之后觸發,且位于ts個槽上*/
tw_timer* timer = new tw_timer(rotation, ts);
/*如果第ts個槽上尚無任何定時器,則把新建的定時器插入其中,
*并將該定時器設置為該槽的頭節點*/
if(!slots[ts]){
printf("add timer, rotation is %d, ts is %d, cur_slot is %dn",
rotation, ts, cur_slot);
slots[ts] = timer;
}
else{
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
void del_timer(tw_timer* timer)
{
if(!timer){
return;
}
int ts = timer->time_slot;
if(timer == slots[ts]){
slots[ts] = slots[ts]->next;
if(slots[ts]){
slots[ts]->prev = NULL;
}
delete timer;
}
else{
timer->prev->next = timer->next;
if(timer->next){
timer->next->prev = timer->prev;
}
delete timer;
}
}
void tick()
{
tw_timer* tmp = slots[cur_slot]; /*取得時間輪上當前槽點的頭節點*/
printf("current slot is %dn", cur_slot);
while(tmp){
printf("tick the timer oncen");
/*如果定時器rotation值大于0, 則它在這一輪不起作用*/
if(tmp->rotation > 0){
tmp->rotation--;
tmp = tmp->next;
}
/*否則定時器已經到期于是執行任務,然后刪除該定時器*/
else{
tmp->cb_func(tmp->user_data);
if(tmp == slots[cur_slot]){
printf("delete header in cur_slot");
slots[cur_slot] = tmp->next;
delete tmp;
if(slots[cur_slot]){
slots[cur_slot]->prev = NULL;
}
tmp = slots[cur_slot];
}
else{
tmp->prev->next = tmp->next;
if(tmp->next){
tmp->next->prev = tmp->prev;
}
tw_timer* tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
cur_slot = ++cur_slot % N;
}
private:
/*時間輪上槽的數目*/
static const int N = 60;
/*每1秒時間輪轉動一次,即槽間隔1s*/
static const int SI = 1;
/*時間輪的槽,其中每個元素只想一個定時器鏈表,鏈表無序*/
tw_timer *slots[N];
int cur_slot; /*時間輪的當前槽*/
};
#endif
時間堆
前面討論的定時方案都是以固定的頻率調用心跳函數tick,并在其中依次檢測到期的定時器,然后執行到期定時器上的回調函數。
設計定時器的另外一種思路是:將所有定時器中超時時間最小的一個定時器的超時值作為心跳間隔。這樣一旦心跳函數tick被調用,超時時間最小的定時器必然到期,我們就可以在tick函數中處理該定時器。然后,再次從剩余的定時器中找出較小的一個,并將這段最小時間設置為下一次心跳間隔,如此反復,就實現了較為精確的定時。
最小堆很適合處理這種方案。最小堆是值每個節點的值都小于或等于其字節點的值的完全二叉樹。
下圖給了一個具有6個元素的最小堆。
樹的基本操作是插入節點和刪除節點。對最小堆而言,它們都很簡單。為了將一個元素X插入最小堆,我們可以在樹的下一個空閑位置創建一個空穴。如果X可以放在空穴而不破壞堆序,則完成插入。都則就執行上濾操作,即交換空穴和它的父節點上的元素。不斷執行上述過程,直到X可以被放入空穴,則插入操作完成。
比如我們要在最小堆中插入值為14的元素,則按如下步驟操作:
最小堆的刪除操作指的是刪除其根節點上的元素,并且不破壞堆序性質。執行刪除操作時,我們需要先在根節點處創建一個空穴。由于堆現在少了一個元素,因此我們可以把堆的最后一個元素X移動到該堆的某個地方。如果X可以可以放入空穴,則刪除操作完成,否則就執行下濾操作,即交換空穴和它的兩個兒子節點中的較小者,不斷進行上述過程,直到X可以放入空穴,則刪除操作完成。
下圖展示了對最小堆執行刪除操作的步驟:
由于最小堆是一種完全二叉樹,所以我們可以用數組來組織其中的元素,最先的最小堆圖可以如下圖所示:
對于數組中的任意位置i上的元素,其左兒子節點在位置2i+1上,其右兒子節點則在位置2i+2上,其父節點在[(i-2)/2](i>0)上。與用鏈表來表示堆相比,用數組表示堆不僅節省空間,而且更實現堆的插入、刪除等操作。
假設我們已經有一個包含N個元素的數組,現在要把它初始化為一個最小堆。那么最簡單的方法是:初始化一個空堆,然后將數組中的每個元素插入該堆中。不過這樣做的效率偏低。實際上,我們只需要對數組中第[(N-1)/2]~0個元素執行下濾操作,即可確保該數組構成一個最小堆。這是因為對包含N個元素的完全二叉樹而言,它具有[(N-1)/2]個非葉子節點。這些非葉子節點正式完全二叉樹的第0~[(N-1)/2]和節點。我們只需要這些非葉子節點構成的子樹都具有堆序性質,整個樹就具有堆序性質。
下面是示例代碼:
#ifndef MIN_HEAP
#define MIN_HEAP
#include <IOStream>
#include <netinet/in.h>
#include <time.h>
using std::exception;
#define BUFFER_SIZE 64
class heap_timer;
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
heap_timer* timer;
};
class heap_timer
{
public:
heap_timer(int delay)
{
expire = time(NULL) + delay;
}
public:
time_t expire; /*定時器生效的絕對時間*/
void (*cb_func)(client_data*); /*定時器的回調函數*/
client_data* user_data; /*用戶數據*/
};
class time_heap
{
public:
/*構造函數之一, 初始化一個大小為cap的空棧*/
time_heap(int cap) throw(std::exception): capacity(cap), cur_size(0)
{
array = new heap_timer*[capacity];
if(!array){
throw std::exception();
}
for(int i = 0; i < capacity ; ++i){
array[i] = NULL;
}
}
/*構造函數之二,用已有數組來初始化堆*/
time_heap(heap_timer** init_array, int size, int capacity) throw
(std::exception): cur_size(size), capacity(capacity)
{
if(capacity < size){
throw std::exception();
}
array = new heap_timer*[capacity];
if(!array){
throw std::exception();
}
for(int i = 0; i < capacity; ++i){
array[i] = NULL;
}
if(size != 0){
for(int i = 0; i < size; ++i){
array[i] = init_array[i];
}
for(int i = (cur_size - 1)/2; i >= 0; --i){
/*對數組的第[(cur_size -1)/2] ~ 0和元素執行下濾操作*/
percolate_down(i);
}
}
}
/*銷毀時間堆*/
~time_heap()
{
for(int i = 0; i < cur_size; ++i){
delete array[i];
}
delete [] array;
}
void add_timer(heap_timer* timer) throw(std::exception)
{
if(!timer){
return;
}
if(cur_size >= capacity){
resize();
}
int hole = cur_size++;
int parent = 0;
/*對從空穴到根節點的路徑上的所有節點執行上濾操作*/
for(; hole >0; hole = parent){
parent = (hole-1)/2;
if(array[parent]->expire <= timer->expire){
break;
}
array[hole] = array[parent];
}
array[hole] = timer;
}
void del_timer(heap_timer* timer)
{
if(!timer){
return;
}
/*僅僅將目標定時器的回調函數設置為空,即所謂的延遲銷毀。
*這將節省真正刪除該定時器造成的開銷,但這樣做容易市堆數組膨脹*/
timer->cb_func = NULL;
}
heap_timer* top() const
{
if(empty()){
return NULL;
}
return array[0];
}
void pop_timer()
{
if(empty()){
return;
}
if(array[0]){
delete array[0];
array[0] = array[--cur_size];
percolate_down(0);
}
}
bool empty() const{
return cur_size == 0;
}
void tick(){
heap_timer* tmp = array[0];
time_t cur = time(NULL);
while(!empty()){
if(!tmp){
break;
}
if(tmp->expire > cur){
break;
}
if(array[0]->cb_func){
array[0]->cb_func(array[0]->user_data);
}
pop_timer();
tmp = array[0];
}
}
public:
private:
/*最小堆下率操作,確保堆數組中以第hole個節點為根的子樹擁有最小堆性質*/
void percolate_down(int hole)
{
heap_timer *temp = array[hole];
int child = 0;
for( ; (hole*2 + 1) <= cur_size-1 ; hole = child){
child = hole*2+1;
if(child < (cur_size -1) && array[child+1]->expire < array[child]->expire){
++child;
}
if(array[child]->expire < temp->expire){
array[hole] = array[child];
}
else{
break;
}
}
array[hole] = temp;
}
/*將堆數組容量擴大一倍*/
void resize() throw (std::exception)
{
heap_timer ** temp = new heap_timer* [2*capacity];
for(int i = 0; i < 2*capacity; ++i){
temp[i] = NULL;
}
if(!temp){
throw std::exception();
}
capacity = 2 * capacity;
for(int i = 0; i < cur_size; ++i){
temp[i] = array[i];
}
delete [] array;
array = temp;
}
heap_timer** array; /*堆數組*/
int capacity; /*堆數組的容量*/
int cur_size; /*堆數組當前包含元素的個數*/
};
#endif