日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

眾所周知,redis的所有數據都存儲在內存中,但是內存是一種有限的資源,所以為了防止Redis無限制的使用內存,在啟動Redis時可以通過配置項 maxmemory 來指定其最大能使用的內存容量。例如可以通過以下配置來設置Redis最大能使用 1G 內存:

maxmemory 1G

當Redis使用的內存超過配置的 maxmemory 時,便會觸發數據淘汰策略。Redis提供了多種數據淘汰的策略,如下:

  • volatile-lru: 最近最少使用算法,從設置了過期時間的鍵中選擇空轉時間最長的鍵值對清除掉
  • volatile-lfu: 最近最不經常使用算法,從設置了過期時間的鍵中選擇某段時間之內使用頻次最小的鍵值對清除掉
  • volatile-ttl: 從設置了過期時間的鍵中選擇過期時間最早的鍵值對清除
  • volatile-random: 從設置了過期時間的鍵中,隨機選擇鍵進行清除
  • allkeys-lru: 最近最少使用算法,從所有的鍵中選擇空轉時間最長的鍵值對清除
  • allkeys-lfu: 最近最不經常使用算法,從所有的鍵中選擇某段時間之內使用頻次最少的鍵值對清除
  • allkeys-random: 所有的鍵中,隨機選擇鍵進行刪除
  • noeviction: 不做任何的清理工作,在redis的內存超過限制之后,所有的寫入操作都會返回錯誤;但是讀操作都能正常的進行

可以在啟動Redis時,通過配置項 maxmemory_policy 來指定要使用的數據淘汰策略。例如要使用 volatile-lru 策略可以通過以下配置來指定:

maxmemory_policy volatile-lru

LRU算法

LRU是 Least Recently Used 的縮寫,即最近最少使用,很多緩存系統都使用此算法作為淘汰策略。

最簡單的實現方式就是把所有緩存通過一個鏈表連接起來,新創建的緩存添加到鏈表的頭部,如果有緩存被訪問了,就把緩存移動到鏈表的頭部。由于被訪問的緩存會移動到鏈表的頭部,所以沒有被訪問的緩存會隨著時間的推移移動的鏈表的尾部,淘汰數據時只需要從鏈表的尾部開始即可。下圖展示了這個過程:

Redis數據淘汰算法

 

Redis的LRU算法

Redis使用了結構體 robj 來存儲緩存對象,而 robj 結構有個名為 lru 的字段,用于記錄緩存對象最后被訪問的時間,Redis就是以 lru 字段的值作為淘汰依據。robj 結構如下:

typedef struct redisObject {
 ...
 unsigned lru:24;
 ...
} robj;

當緩存對象被訪問時,便會更新此字段的值。代碼如下:

robj *lookupKey(redisDb *db, robj *key, int flags) {
 dictEntry *de = dictFind(db->dict,key->ptr);
 if (de) {
 robj *val = dictGetVal(de);
 /* Update the access time for the ageing algorithm.
 * Don't do it if we have a saving child, as this will trigger
 * a copy on write madness. */
 if (server.rdb_child_pid == -1 &&
 server.aof_child_pid == -1 &&
 !(flags & LOOKUP_NOTOUCH))
 {
 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
 updateLFU(val);
 } else {
 val->lru = LRU_CLOCK(); // 更新lru字段的值
 }
 }
 return val;
 } else {
 return NULL;
 }
}

lookupKey() 函數用于查找key對應的緩存對象,所以當緩存對象被訪問時便會調用此函數。

Redis數據淘汰

接下來我們分析一下當Redis內存使用超過配置的最大內存使用限制時的處理方式。

Redis在處理每一個命令時都會檢查內存的使用是否超過了限制的最大值,處理命令是通過 processCommand() 函數進行的,檢查內存使用情況的代碼如下:

int processCommand(client *c) {
 ...
 if (server.maxmemory && !server.lua_timedout) {
 int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
 if (server.current_client == NULL) return C_ERR;
 if (out_of_memory &&
 (c->cmd->flags & CMD_DENYOOM ||
 (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
 flagTransaction(c);
 addReply(c, shared.oomerr);
 return C_OK;
 }
 }
 ...
}

檢查內存的使用情況主要通過 freeMemoryIfNeededAndSafe() 函數進行,而 freeMemoryIfNeededAndSafe() 函數最終會調用 freeMemoryIfNeeded() 函數進行處理,由于 freeMemoryIfNeeded() 函數比較龐大,所以我們分段來進行分析:

int freeMemoryIfNeeded(void) {
 ...
 size_t mem_reported, mem_tofree, mem_freed;
 mstime_t latency, eviction_latency;
 long long delta;
 int slaves = listLength(server.slaves);
 ...
 if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
 return C_OK;
 mem_freed = 0;
 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
 goto cant_free;

freeMemoryIfNeeded() 函數首先會調用 getMaxmemoryState() 函數來獲取Redis的內存使用情況,如果 getMaxmemoryState() 函數返回 C_OK,表示內存使用總量還沒有超出限制,直接返回 C_OK 就可以了。如果 getMaxmemoryState() 函數不是返回 C_OK,表示內存使用總量已經超出限制,需要進行數據淘汰,需要淘汰數據的大小通過 mem_tofree 參數返回。

當然,如果配置的淘汰策略為 noeviction,表示不能進行數據淘汰,所以需要返回 C_ERR 表示有錯誤。

接著分析剩余的代碼片段:

 latencyStartMonitor(latency);
 while (mem_freed < mem_tofree) {
 int j, k, i, keys_freed = 0;
 static unsigned int next_db = 0;
 sds bestkey = NULL;
 int bestdbid;
 redisDb *db;
 dict *dict;
 dictEntry *de;
 if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
 server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
 {
 struct evictionPoolEntry *pool = EvictionPoolLRU;
 while(bestkey == NULL) {
 unsigned long total_keys = 0, keys;
 for (i = 0; i < server.dbnum; i++) {
 db = server.db+i;
 dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
 db->dict : db->expires;
 if ((keys = dictSize(dict)) != 0) {
 evictionPoolPopulate(i, dict, db->dict, pool);
 total_keys += keys;
 }
 }
 if (!total_keys) break; /* No keys to evict. */
 for (k = EVPOOL_SIZE-1; k >= 0; k--) {
 if (pool[k].key == NULL) continue;
 bestdbid = pool[k].dbid;
 if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
 de = dictFind(server.db[pool[k].dbid].dict,
 pool[k].key);
 } else {
 de = dictFind(server.db[pool[k].dbid].expires,
 pool[k].key);
 }
 if (pool[k].key != pool[k].cached)
 sdsfree(pool[k].key);
 pool[k].key = NULL;
 pool[k].idle = 0;
 if (de) {
 bestkey = dictGetKey(de);
 break;
 } else {
 /* Ghost... Iterate again. */
 }
 }
 }
 }

如果內存使用總量超出限制,并且配置了淘汰策略,那么就開始數據淘汰過程。在上面的代碼中,mem_tofree 變量表示要淘汰的數據總量,而 mem_freed 變量表示已經淘汰的數據總量。所以在 while 循環中的條件是 mem_freed < mem_tofree,表示淘汰的數據總量一定要達到 mem_tofree 為止。

前面介紹過,Redis的淘汰策略有很多中,所以進行數據淘汰時需要根據配置的策略進行。如果配置的淘汰策略是 LRU/LFU/TTL 的話,那么就進入 if 代碼塊。在 if 代碼塊里,首先調用 evictionPoolPopulate() 函數選擇一些緩存對象樣本放置到 EvictionPoolLRU 數組中。evictionPoolPopulate() 函數后面會進行分析,現在只需要知道 evictionPoolPopulate() 函數是選取一些緩存對象樣本就可以了。

獲取到緩存對象樣本后,還需要從樣本中獲取最合適的緩存對象進行淘汰,因為在選擇樣本時會把最合適的緩存對象放置在 EvictionPoolLRU 數組的尾部,所以只需要從 EvictionPoolLRU 數組的尾部開始查找一個不為空的緩存對象即可。

 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
 {
 for (i = 0; i < server.dbnum; i++) {
 j = (++next_db) % server.dbnum;
 db = server.db+j;
 dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
 db->dict : db->expires;
 if (dictSize(dict) != 0) {
 de = dictGetRandomKey(dict);
 bestkey = dictGetKey(de);
 bestdbid = j;
 break;
 }
 }
 }

如果使用隨機淘汰策略,那么就進入 else if 代碼塊,這部分代碼的邏輯很簡單,如果配置的淘汰策略是 volatile-random,那么就從有過期時間的緩存對象中隨機獲取,否則就從所有的緩存對象中隨機獲取。

 if (bestkey) {
 db = server.db+bestdbid;
 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
 propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
 delta = (long long) zmalloc_used_memory();
 latencyStartMonitor(eviction_latency);
 // 刪除緩存對象
 if (server.lazyfree_lazy_eviction)
 dbAsyncDelete(db,keyobj);
 else
 dbSyncDelete(db,keyobj);
 latencyEndMonitor(eviction_latency);
 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
 latencyRemoveNestedEvent(latency,eviction_latency);
 delta -= (long long) zmalloc_used_memory();
 mem_freed += delta;
 server.stat_evictedkeys++;
 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
 keyobj, db->id);
 decrRefCount(keyobj);
 keys_freed++;
 if (slaves) flushSlavesOutputBuffers();
 if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
 if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
 mem_freed = mem_tofree;
 }
 }
 }

如果找到要淘汰的緩存對象,那么就開始釋放緩存對象所占用的內存空間。除了需要釋放緩存對象占用的內存空間外,還需要進行一些其他的操作,比如把淘汰的緩存對象同步到從服務器和把淘汰的緩存對象追加到 AOF文件 中等。

當條件 mem_freed < mem_tofree 為假時便會退出 while 循環,說明Redis的內存使用總量已經小于最大的內存使用限制,freeMemoryIfNeeded() 函數便會返回 C_OK 表示成功執行。

淘汰數據樣本采集

前面說了,當使用非隨機淘汰策略時需要進行數據采樣(volatile-lru/volatile-lfu/volatile-ttl/allkeys-lru/allkeys-lfu),數據采樣通過 evictionPoolPopulate() 函數進行,由于此函數比較龐大,所以對代碼分段分析:

void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
 int j, k, count;
 dictEntry *samples[server.maxmemory_samples];
 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);

evictionPoolPopulate() 函數首先調用 dictGetSomeKeys() 函數從緩存對象集合中獲取一些樣本,并保存在 samples 數組中。

 for (j = 0; j < count; j++) {
 unsigned long long idle;
 sds key;
 robj *o;
 dictEntry *de;
 de = samples[j];
 key = dictGetKey(de);
 if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
 if (sampledict != keydict) de = dictFind(keydict, key);
 o = dictGetVal(de);
 }
 if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
 idle = estimateObjectIdleTime(o);
 } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
 idle = 255-LFUDecrAndReturn(o);
 } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
 idle = ULLONG_MAX - (long)dictGetVal(de);
 } else {
 serverPanic("Unknown eviction policy in evictionPoolPopulate()");
 }

上面的代碼主要是獲取樣本緩存對象的排序權值 idel,如果使用 LRU淘汰算法,那么就調用 estimateObjectIdleTime() 函數獲取排序權值,estimateObjectIdleTime() 函數用于獲取緩存對象有多長時間沒有被訪問。排序按照 idle 的值升序排序,就是說 idle 的值越大,就排到越后。

 k = 0;
 while (k < EVPOOL_SIZE &&
 pool[k].key &&
 pool[k].idle < idle) k++;
 if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
 continue;
 } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
 } else {
 if (pool[EVPOOL_SIZE-1].key == NULL) {
 sds cached = pool[EVPOOL_SIZE-1].cached;
 memmove(pool+k+1,pool+k,
 sizeof(pool[0])*(EVPOOL_SIZE-k-1));
 pool[k].cached = cached;
 } else {
 k--;
 sds cached = pool[0].cached;
 if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
 memmove(pool,pool+1,sizeof(pool[0])*k);
 pool[k].cached = cached;
 }
 }
 int klen = sdslen(key);
 if (klen > EVPOOL_CACHED_SDS_SIZE) {
 pool[k].key = sdsdup(key);
 } else {
 memcpy(pool[k].cached,key,klen+1);
 sdssetlen(pool[k].cached,klen);
 pool[k].key = pool[k].cached;
 }
 pool[k].idle = idle;
 pool[k].dbid = dbid;
 }
}

上面這段代碼的作用是:根據 idle 的值找到當前緩存對象所在 EvictionPoolLRU 數組的位置,然后把緩存對象保存到 EvictionPoolLRU 數組中。以下插圖解釋了數據采樣的過程:

Redis數據淘汰算法

 

所以 EvictionPoolLRU 數組的最后一個元素便是最優的淘汰緩存對象。

從上面的分析可知,淘汰數據時只是從樣本中找到最優的淘汰緩存對象,并不是從所有緩存對象集合中查找。由于前面介紹的 LRU算法 需要維護一個LRU鏈表,而維護一個LRU鏈表的成本比較大,所以Redis才出此下策。

分享到:
標簽:算法 Redis
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定