What is 限流?
限流顧名思義,限制流量或者說叫流量管制。
很形象的比喻如老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。
Why use 限流?
理論上一個完整的對外提供服務的系統(tǒng)架構在設計初期,就要基于上游流量,流速,高峰期時間點,峰值 qps,還有自身系統(tǒng)的負載能力,評估系統(tǒng)的吞吐量,并且進行入口流量的管制。
當超出限流閾值時,系統(tǒng)可以采取拒絕服務,排隊或者引流等機制, 保證自身一直在健康的負載下。
如果系統(tǒng)沒有限流策略,對于突發(fā)性的超自身負載的流量,系統(tǒng)只能被動的無奈接受,系統(tǒng)內(nèi)各個子服務逐漸解體,最后服務整體雪崩。
小概念 Review
QPS
Queries Per Second (每秒查詢率),每秒查詢率 QPS 是對一個特定的查詢服務器在規(guī)定時間內(nèi)所處理流量多少的衡量標準,在因特網(wǎng)上,作為域名系統(tǒng)服務器的機器的性能經(jīng)常用每秒查詢率來衡量。
RPS
Requests Per Second (每秒發(fā)送請求數(shù) /吞吐率),指客戶端每秒發(fā)出的請求數(shù)。阿里云 PTS 對于這個詞的解釋為 RPS 有些地方也叫做 QPS,在不單獨討論“事務”的情況下可以近似對應到 Loadrunner/jmeter 的 TPS ( Transaction Per Second, 每秒事務數(shù))。
TPS
Transactions Per Second (每秒傳輸?shù)氖挛锾幚韨€數(shù)),即服務器每秒處理的事務數(shù)。TPS 一般包括一條消息入和一條消息出,加上一次用戶數(shù)據(jù)庫訪問。(業(yè)務 TPS = CAPS × 每個呼叫平均 TPS )
限流粒度
- 集群限流
- 單機限流
集群限流
集群限流方式可以歸納為兩種
- 網(wǎng)關層
- 應用層
網(wǎng)關層
網(wǎng)關層常見設計,基于 Nginx lua module 實現(xiàn)整體管控。下面是簡單 lua demo 。
local locks = require "resty.lock"
local function limiter()
-- ngx dict
local limiter = ngx.shared.limiter
-- limiter lock
local lock = locks:new("limiter_lock")
local key = gx.var.host..ngx.var.uri
-- add lock
local elapsed, err =lock:lock("ngx_limiter:"..key)
if not elapsed then
return fail("failed to acquire the lock: ", err)
end
-- limit max value
local limit = 5
-- current value
local current =limiter:get(key)
-- 限流
if current ~= nil and current + 1> limit then
lock:unlock()
return 0
end
if current == nil then
limiter:set(key, 1, 1) -- 初始化
else
limiter:incr(key, 1) -- +1
end
lock:unlock()
return 1
end
ngx.print(limiter())
了解 lua-resty-lock:
https://github.com/openresty/lua-resty-lock
Nginx.conf
http {
……
lua_shared_dict limiter_lock 10m;
lua_shared_dict limiter 10m;
}
應用層
應用層常見通過業(yè)務代碼實現(xiàn),基于 redis 計數(shù), 通過 lua script 保證 redis 執(zhí)行原子性
local key = "limiter:" .. KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = tonumber(ARGV[2])
local is_exists = redis.call("EXISTS", key)
if is_exists == 1 then
if redis.call("INCR", key) > limit then
return 0
else
return 1
end
else
redis.call("SET", key, 1)
redis.call("EXPIRE", key, expire_time)
return 1
end
單機限流
單兵作戰(zhàn),自生自滅,我不倒集群不倒。不依賴存儲中間件,基于 local cache 就可以實現(xiàn)簡單的本地計數(shù)限流,宏觀角度觀察,只要網(wǎng)關層負載均衡服務高可用,每個節(jié)點流量差別不大,只需要關心單個節(jié)點的流量管控就可以。
以上是限流粒度分類,下面說說具體的限流算法模型。
限流模型
以上 Demo 都是基于簡單的固定時間窗口模型實現(xiàn)限流,但是當出現(xiàn)臨界點瞬間大流量沖擊,。
常用的模型分類有兩種:
- 時間模型
- 桶模型
時間模型
時間模型分兩種:
- 固定窗口模型
- 滑動窗口模型
固定時間模型
上面聊到的各粒度限流模式的 code demo 都是這種方式。
如圖(圖片來源網(wǎng)絡),拉長 timeline,以 QPS 為例,限流 1000QPS,我們會講 timeline 按照固定間隔分窗口,每個窗口有一個獨立計數(shù)器,每個計數(shù)器統(tǒng)計窗口內(nèi)的 qps,如果達到閾值則拒絕服務,這是一種最簡單的限流模型,但是缺點比較明顯,當在臨界點出現(xiàn)大流量沖擊,就無法滿足流量控制。
如圖(圖片來源網(wǎng)絡),在 900ms 和 1100ms 都出現(xiàn) 1000QPS 并發(fā),雖然單個窗口內(nèi)是符合限流要求,但是實際上臨界點處的 QPS 已經(jīng)打到 2000,服務過載。
滑動時間模型
如圖(圖片來源網(wǎng)絡),為了規(guī)避臨界點大流量沖擊,滑動時間模型會將每個窗口切分成 N 個子窗口,每個子窗口獨立計數(shù)。這樣用w1+w2計數(shù)之和來做限流閾值校驗,就可以解決此問題。
桶模型
桶模型也分兩種:
- 令牌桶
- 漏桶
令牌桶模型
令牌桶算法的原理是系統(tǒng)會以一個恒定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。不過令牌桶還是允許一定程度的突發(fā)傳輸,這樣解決了在實際上的互聯(lián)網(wǎng)應用中,流量經(jīng)常是突發(fā)性的問題。
Ref: https://en.wikipedia.org/wiki/Token_bucket
如圖:
算法實現(xiàn)方式有兩種:
- Ticker
定義一個 Ticker,持續(xù)生成令牌并導入桶中。這樣問題是會極大的消耗系統(tǒng)資源。如果基于某一維度進行限流,會創(chuàng)建多桶,對應多 Ticker,資源消耗很可怕。 - Inert Fill
惰性填充,定義一個 inert fill 函數(shù)。該函數(shù)會在每次獲取令牌之前調(diào)用,其實現(xiàn)思路為,若當前時間晚于 lastAccessTime,則計算該段時間內(nèi)可以生成多少令牌,將生成的令牌加入令牌桶中并更新數(shù)據(jù)。這樣一來,只需要在獲取令牌時計算一次即可。
桶內(nèi)令牌數(shù)計數(shù)方式
桶內(nèi)令牌數(shù) = 剩余的令牌數(shù) + (本次取令牌的時刻-上一次取令牌的時刻)/放置令牌的時間間隔 * 每次放置的令牌數(shù)
常用令牌桶如: github.com/juju/ratelimit 2K Star
多種填充令牌方式:
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
默認令牌桶,fillInterval 每過多?時間向桶?放?個令牌,capacity 是桶的容量,超過桶容量的部分會被直接丟棄。
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
和默認方式一樣,唯一不同是每次填充的令牌數(shù)是 quantum,而不是 1 個。
func NewBucketWithRate(rate float64, capacity int64) *Bucket
按照使用方定義的?例,每秒鐘填充令牌數(shù)。比如 capacity 是 100,? rate 是 0.1,那么每秒會填充 10 個令牌。
多種領取令牌方式:
func (tb *Bucket) Take(count int64) time.Duration {}
func (tb *Bucket) TakeAvailable(count int64) int64 {}
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (
time.Duration, bool,
) {}
func (tb *Bucket) Wait(count int64) {}
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {}
如下,我簡單實現(xiàn)了一個極簡的令牌桶, 速率默認為 QPS 。
TokenBucket Demo
Struct 結構
// object
type TbConfig struct {
QPS int64 // 限制 qps e.g 200
MaxCap int64 // 桶最大容量 e.g:1000
}
type TokenBucket struct {
*TbConfig
m sync.Mutex // 讀寫鎖
available int64 // 可用令牌
lastTime time.Time // 最后一次獲取令牌時間
}
Inert Fill
func (tb *TokenBucket) fill() error {
n := time.Now()
timeUnit := n.Sub(tb.latestTime).Seconds()
fillCnt := int64(timeUnit) * tb.QPS // 見文下描述
if fillCnt <= 0 {
return nil
}
tb.available += fillCnt
// 防止過大溢出
if tb.MaxCap > 0 && tb.available > tb.MaxCap {
tb.available = tb.MaxCap
}
tb.latestTime = n
return nil
}
桶內(nèi)令牌數(shù) = 剩余的令牌數(shù)**tb.available** + (本次取令牌的時刻**n** - 上一次取令牌的時刻**tb.latestTime) / 放置令牌的時間間隔速率為 qps,所以此處是1** * 每次放置的令牌數(shù)**tb.QPS**
漏桶模型
漏桶算法思路很簡單,如下圖(圖片來源網(wǎng)絡),水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出,可以看出漏桶算法能強行限制數(shù)據(jù)的傳輸速率。
簡單的說: 調(diào)用方只能嚴格按照預定的間隔順序進行消費調(diào)用。
Ref: https://en.wikipedia.org/wiki/Leaky_bucket
常用漏桶:
https://github.com/uber-go/ratelimit 2.4k Star
對于很多應用場景來說,除了要求能夠限制流量的平均傳輸速率外,還要求允許某種程度的突發(fā)傳輸。
傳統(tǒng)的 Leaky Bucket,關鍵點在于漏桶始終按照固定的速率運行,但是它并不能很好的處理有大量突發(fā)請求的場景。
對于這種情況,uber-go 對 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。
當請求間隔時間小于固定的速率時,可以把間隔比較長的請求多余出來的時間 buffer,勻給后面的使用,保證每秒請求數(shù)。如果間隔時間遠遠超出固定速率,那會給后續(xù)請求增加超大的 buffer,以至于即使后面大量請求瞬時到達,也無法抵消完這個時間,那這樣就失去了限流的意義。所以 maxSlack 會限制這個 buffer 上限。
LeakyBucket Demo
如下,實現(xiàn)了一個極簡的非阻塞漏桶。
Struct 結構
// object
type LbConfig struct {
Rate float64 // 速率 e.g 200: 每秒 200 次請求
MaxSlack int64 // 最大松弛量,可以理解 buffer 時間內(nèi)最大放行的 qps 。默認為 0 表示不開啟松弛量 e.g 10: 如果松弛量大于 10,則松弛量強制為 10
}
type LeakyBucket struct {
*LbConfig
m sync.Mutex // 讀寫鎖
perRequest time.Duration // 速率
bufferTime time.Duration // 多余時間
slackTime time.Duration // 最大松弛時間
lastTime time.Time // 最后一次獲取令牌時間
}
無松弛量實現(xiàn)
即嚴格按照預定時間間隔獲取令牌。
func (lb *LeakyBucket) withoutSlack() error {
n := time.Now()
lb.bufferTime = lb.perRequest - n.Sub(lb.lastTime)
// 多余時間如果為正數(shù): 證明前后時間間隔超過預期速率,需要拒絕服務
if lb.bufferTime > 0 {
return ErrNoTEnoughToken
} else {
lb.lastTime = n
}
return nil
}
有松弛量實現(xiàn)
即多余時間勻給后面獲取令牌使用。
func (lb *LeakyBucket) withSlack() error{
n := time.Now()
// 此處為+= 表示要累計多余時間
lb.bufferTime += lb.perRequest - n.Sub(lb.lastTime)
// 多余時間如果為正數(shù): 證明前后時間間隔超過預期速率,需要拒絕服務
if lb.bufferTime > 0 {
return ErrNoTEnoughToken
} else {
lb.lastTime = n
}
// 允許抵消的最長時間
if lb.bufferTime < lb.slackTime {
lb.bufferTime = lb.slackTime
}
return nil
}
Demo 源碼
源碼可見
github.com/xiaoxuz/limiter
感謝閱讀,三連是最大的支持!