限流算法
- 計數器限流固定窗口滑動窗口
- 桶限流令牌桶漏桶
計數器
計數器限流可以分為:
- 固定窗口
- 滑動窗口
固定窗口
固定窗口計數器限流簡單明了,就是限制單位之間內的請求數,比如設置QPS為10,那么從一開始的請求進入就計數,每次計數前判斷是否到10,到達就拒絕請求,并保證這個計數周期是1秒,1秒后計數器清零。
以下是利用redis實現計數器分布式限流的實現,曾經在線上實踐過的lua腳本:
local key = KEYS[1] local limit = tonumber(ARGV[1]) local refreshInterval = tonumber(ARGV[2]) local currentLimit = tonumber(redis.call('get', key) or '0') if currentLimit + 1 > limit then return -1; else redis.call('INCRBY', key, 1) redis.call('EXPIRE', key, refreshInterval) return limit - currentLimit - 1 end
一個明顯的弊端就是固定窗口計數器算法無法處理突刺流量,比如10QPS,1ms中來了10個請求,后續的999ms的所有請求都會被拒絕。
滑動窗口
為了解決固定窗口的問題,滑動窗口將窗口細化,用更小的窗口來限制流量。比如 1 分鐘的固定窗口切分為 60 個 1 秒的滑動窗口。然后統計的時間范圍隨著時間的推移同步后移。
即便滑動時間窗口限流算法可以保證任意時間窗口內接口請求次數都不會超過最大限流值,但是仍然不能防止在細時間粒度上面訪問過于集中的問題。
為了應對上面的問題,對于時間窗口限流算法,還有很多改進版本,比如:
多層次限流,我們可以對同一個接口設置多條限流規則,除了 1 秒不超過 100 次之外,我們還可以設置 100ms 不超過 20 次 (代碼中實現寫了平均的兩倍),兩條規則同時限制,流量會更加平滑。
簡單實現的代碼如下:
public class SlidingWindowRateLimiter { // 小窗口鏈表 LinkedList<Room> linkedList = null; long stepInterval = 0; long subWindowCount = 10; long stepLimitCount = 0; int countLimit = 0; int count = 0; public SlidingWindowRateLimiter(int countLimit, int interval){ // 每個小窗口的時間間距 this.stepInterval = interval * 1000/ subWindowCount; // 請求總數限制 this.countLimit = countLimit; // 每個小窗口的請求量限制數 設置為平均的2倍 this.stepLimitCount = countLimit / subWindowCount * 2; // 時間窗口開始時間 long start = System.currentTimeMillis(); // 初始化連續的小窗口鏈表 initWindow(start); } Room getAndRefreshwindows(long requestTime){ Room firstRoom = linkedList.getFirst(); Room lastRoom = linkedList.getLast(); // 發起請求時間在主窗口內 if(firstRoom.getStartTime() < requestTime && requestTime < lastRoom.getEndTime()){ long distanceFromFirst = requestTime - firstRoom.getStartTime(); int num = (int) (distanceFromFirst/stepInterval); return linkedList.get(num); }else{ long distanceFromLast = requestTime - lastRoom.getEndTime(); int num = (int)(distanceFromLast/stepInterval); // 請求時間超出主窗口一個窗口以上的身位 if(num >= subWindowCount){ initWindow(requestTime); return linkedList.getFirst(); }else{ moveWindow(num+1); return linkedList.getLast(); } } } public boolean acquire(){ synchronized (mutex()) { Room room = getAndRefreshWindows(System.currentTimeMillis()); int subCount = room.getCount(); if(subCount + 1 <= stepLimitCount && count + 1 <= countLimit){ room.increase(); count ++; return true; } return false; } } /** * 初始化窗口 * @param start */ private void initWindow(long start){ linkedList = new LinkedList<Room>(); for (int i = 0; i < subWindowCount; i++) { linkedList.add(new Room(start, start += stepInterval)); } // 總記數清零 count = 0; } /** * 移動窗口 * @param stepNum */ private void moveWindow(int stepNum){ for (int i = 0; i < stepNum; i++) { Room removeRoom = linkedList.removeFirst(); count = count - removeRoom.count; } Room lastRoom = linkedList.getLast(); long start = lastRoom.endTime; for (int i = 0; i < stepNum; i++) { linkedList.add(new Room(start, start += stepInterval)); } } public static void main(String[] args) throws InterruptedException { SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(20, 5); for (int i = 0; i < 26; i++) { System.out.println(slidingWindowRateLimiter.acquire()); Thread.sleep(300); } } class Room{ Room(long startTime, long endTime) { this.startTime = startTime; this.endTime = endTime; this.count = 0; } private long startTime; private long endTime; private int count; long getStartTime() { return startTime; } long getEndTime() { return endTime; } int getCount() { return count; } int increase(){ this.count++; return this.count; } } private volatile Object mutexDoNotUseDirectly; private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; } }
以上實現使用了鏈表的特性,在一定窗口時是將前段刪除后段加上的方式移動的。移動的操作是請求進入時操作的,沒有使用單獨線程移動窗口。并且按照前面講的,兩個維度控制流量一個時窗口的總請求數,一個是每個單獨窗口的請求數。
桶
令牌桶
原理如圖:
目前使用令牌桶實現的限流有以下幾個:
- Spring Cloud Gateway RateLimiter
- Guava RateLimiter
- Bucket4j
Spring Cloud Gateway RateLimiter
zuul2跳票后Spring Cloud 自己開發的網關,內部也實現了限流器
Spring Cloud Gateway RedisRateLimiter實現原理
因為是微服務架構,多服務部署是必然場景,所以默認提供了redis為存儲載體的實現,所以直接看rua腳本是怎么樣的就可以知道它的算法是怎么實現的了:
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) // 速率 local rate = tonumber(ARGV[1]) // 桶容量 local capacity = tonumber(ARGV[2]) // 發起請求時間 local now = tonumber(ARGV[3]) // 請求令牌數量 現在固定是1 local requested = tonumber(ARGV[4]) // 存滿桶耗時 local fill_time = capacity/rate // 過期時間 local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) // 上次請求的信息 存在redis local last_tokens = tonumber(redis.call("get", tokens_key)) // 可任務初始化桶是滿的 if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) // 上次請求的時間 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) // 現在距離上次請求時間的差值 也就是距離上次請求過去了多久 local delta = math.max(0, now-last_refreshed) // 這個桶此時能提供的令牌是上次剩余的令牌數加上這次時間間隔按速率能產生的令牌數 最多不能超過片桶大小 local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) // 能提供的令牌和要求的令牌比較 local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then // 消耗調令牌 new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) // 存儲剩余的令牌 redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
Spring Cloud Gateway RedisRateLimiter總結
1,在請求令牌時計算上次請求和此刻的時間間隔能產生的令牌,來刷新桶中的令牌數,然后將令牌提供出去。
2,如果令牌不足沒有等待,直接返回。
3,實現的方式是將每個請求的時間間隔為最小單位,只要小于這個單位的請求就會拒絕,比如100qps的配置,1允許10ms一個,如果兩個請求比較靠近小于10ms,后一個會被拒絕。
Guava RateLimiter
一個Guava實現的令牌桶限流器。
Guava RateLimiter類關系
Guava RateLimiter使用
RateLimiter就是一個根據配置的rate分發permits的工具。每一次調用acquire()都會阻塞到獲得permit為止,一個permit只能用一次。RateLimiter是并發安全的,它將限制全部線程的全部請求速率,但是RateLimiter并不保證是公平的。
限速器經常被使用在限制訪問物理或邏輯資源的速率,和JAVA.util.concurrent.Semaphore對比,Semaphore用于限制同時訪問的數量,而它限制的是訪問的速率(兩者有一定關聯:就是little's law(律特法則))。
RateLimiter主要由發放許可證的速度來定義。如果沒有其他配置,許可證將以固定的速率分發,以每秒許可證的形式定義。許可證將平穩分發,各許可證之間的延遲將被調整到配置的速率。(SmoothBursty)
也可以將RateLimiter配置為有一個預熱期,在此期間,每秒鐘發出的許可穩步增加,直到達到穩定的速率。(SmoothWarmingUp)
一個例子:我們一個任務列表需要執行,但是每秒提交的任務個數不能超過2個。
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second" void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
另一個例子:我們生產一個數據流想以5kb每秒的速度傳輸,這個可以通過將一個permit對應一個byte,并定義5000/s的速率。
final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second void submitPacket(byte[] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); }
有一點需要注意,請求令牌的數量不會影響請求自身的節流,但是會影響下一次請求,如果一個需要大量令牌的任務到達時,將馬上授予,但是下一次請求將會為上次昂貴任務付出代價。
Guava RateLimiter是如何設計的,為什么?
對于RateLimiter來說最關鍵的特性是:在一般情況下允許的最大速率-“恒定速率”。這需要對傳入的請求強制執行即計算“節流”,也需要在適當的節流時間,讓請求線程等待。
維護QPS的速率最簡單的方式就是保留最后準許請求的時間戳,確保能計算從那是到現在流逝的時間和QPS。舉個例子,一個QPS為5(5個token每秒)的速率,如果我們確保請求被準許時比最后的請求早200ms,我們就達到了想要的速率。如果最后準許的請求只過去100ms,那么我們需要等待100ms。在這個速率下,提供新的15個許可證(即調用acquire(15))自然需要3s。
RateLimiter只記住最后一次請求的時間戳,只是對過去非常淺的記憶,意識到這點很重要。假如很長一段時間沒有使用RateLimiter,然后來了一個請求,是馬上準許嗎?這樣的RateLimiter將馬上忘記剛剛過去的利用不足的時間。最終,因為現實請求速率的不規則導致要么利用不足或溢出的結果。過去利用不足意味著存在過??捎觅Y源,所以RateLimiter想要利用這些資源,就需要提速一會兒。在計算機網絡中應用的速率(帶寬限制),那些過去利用不足的一般會轉換成“幾乎為空的緩沖區”,可以馬上用于后續流量。
另一方面,過去利用不足也意味著“服務器對未來請求的處理的準備變得越來越少”,也就是說,緩存變過期,請求將更有可能觸發昂貴的操作(一個更極端的例子,當服務啟動后,它通常忙于提高自己的速度)。
為了處理這類場景,我們增加了一個額外的維度,將過去利用不足建模到storedPermits字段。沒有利用不足情況時這個字段的值是0,隨著逐漸到達充分利用不足,這個字段的值會增大到一個最大值maxStoredPermits。
所以,當執行acquire(permits)方法來請求許可證從兩方面獲取:
- 1,存儲的許可證(如果有可用的),
- 2,新的許可證(對于任何剩余的許可)。
下面用一個例子來解釋工作原理:假設RateLimiter每秒生成一個token,每一秒過去而RateLimiter沒有被使用的話,就會把storedPermits加1。我們不使用RateLimiter10秒(即,我們期望在一個X時間有一個請求,可是實際在X+10秒后請求才到達;這個和最后一個段落的結論有關),storedPermits會變成10(假定maxStoredPermits>=10)。在那時一個調用acquire(3)的請求到達。我們使用storedPermits將它降到7來響應這個請求(how this is translated to throttling time is discussed later),之后acquire(10)的請求馬上到達。我們使用storedPermits中全部剩余的7個permits,剩下的3個使用RateLimiter生產的新permits。我們已經知道,獲得三個fresh permits需要多少時間:如果速率是“1秒一個token”,那么需要3秒時間。但是提供7個存儲的permits意味著什么?前面的解釋中沒有唯一的答案。如果主要關注處理利用不足的情況,我們存儲permits為了給出比fresh permits快,因為利用不足=空閑資源。如果主要關注overflow的場景,存儲permits可以比fresh permits慢給出。所以,我們需要一個方法將storedPermits轉換成節流時間(throttling time)。
而storedPermitsToWaitTime(double storedPermits, double permitsToTake)這個方法就扮演了這個轉換的角色。它的基礎模型是一個連續函數映射存儲令牌(從0到maxStoredPermits)在1/Rate的積分。我們利用空閑時間來存儲令牌,所以storedPermits本質上是度量了空閑時間(unused time)。Rate=permits/time,1/Rate=time/permits,所以,1/Rate和permits相乘就可以算出時間。即處理一定量的令牌請求時,對這個函數進行積分就是對應于后續請求的最小間隔。
關于storedPermitsToWaitTime的一個例子:如果storedPermits==10,我們先需要3個從storedPermits中拿,storedPermits降到7,使用storedPermitsToWaitTime(storedPermits=10, permitsToToken=3)計算節流時間,它將求出函數從7到10的積分值。
使用積分保證了一個單獨的acquire(3)和拆分成{acquire(1),acquire(1),acquire(1)}或{acquire(2),acquire(1)}都是一樣的,另外,不管函數是怎么樣的,對于函數[7,10]的求積分是等于[7,8],[8,9],[9,10]的總和的。這就保證了我們可以正確處理不同權重(permits不同)的請求,不管函數是什么,因此我們可以自由調整函數的算法(很顯然,只有一個要求:可以被求積分)。注意,如果這個函數畫的是個數值剛好是1/QPS的水平線,那么這個函數就失去作用了,因為它表示存儲令牌的速率和生產新令牌的速率是一致的,后續中會用到這個技巧。如果這個函數的值在水平線的下面,也就是f(x)<1/Rate,表示我們減少了積分的面積,也就是相同存儲的令牌數映射的節流時間相對于正常速率產生的時間變少了,也代表RateLimiter在一段時間空閑后變快了。相反的,如果函數的值在水平線的上面,表示增加了積分的面積,獲得存儲令牌的消耗要大于新生產令牌,那就意味著RateLimiter在一段時間空閑后變慢了。
最后但也重要,如果RateLimiter采用QPS=1的速度,那么開銷較大的acquire(100)到達時,那是沒有必要等到100s才開始實際任務,為什么不在等待的時候做點什么呢?更好的辦法是我們可以馬上先開始執行任務(就像它是acquire(1)一個個請求的樣子),需要把未來的請求推后。
在這個版本,我們允許馬上執行任務,并把未來的請求推后100s,所以,我們允許在這段時間里完成工作,而不是空等。
這就有了一個很重要的結論:RateLimiter并需要不記住最后請求的時間,而只需要記住期望下一個請求到來的時間。因為我們一直堅持這一點,如此我們就可以馬上識別出一個請求(tryAcquire(timeout))超時時間是否滿足下一個預期請求到達的時間點。通過這個概念我們可以重新定義“一個空閑的RateLimiter”:當我們觀察到“期望下一個請求達到時間”實際已經過去,并且差值(now-past)也就是大量時間被轉換成storedPermits。(我們把在空閑時間生產的令牌增加storedPermits)。所以,如果Rate=1 permit/s,且到達時間恰好比前一次請求晚一秒,那么storedPermits將永遠不會增加—我們只會在到達時間晚于一秒時增加它。
SmoothWarmingUp實現原理:顧名思義,這里想要實現的是一個有預熱能力的RateLimiter,作者在注釋中還畫了這幅圖:
在進入詳細實現前,讓我們先記住幾個基本原則:
- Y軸表示RateLimiter(storedPermits數量)的狀態,不同的storedPermits量對應不同的節流時間。
- 隨著RateLimiter空閑時間的推移,storedPermits不斷向右移動,直到maxPermits。
- 如果在有storedPermits的情況下,我們優先使用它,所以隨著RateLimiter被使用,就會向左移動,直到0。
- 當始空閑時,我們以恒定的速度前進!我們向右移動的速率被選擇為maxPermits/預熱周期。這確保從0到maxpermit所花費的時間等于預熱時間。(由coolDownIntervalMicros方法控制)
- 當使用時,正如在前面類注釋中解釋的那樣,假設我們想使用K個storedPermits,它花費的時間等于函數在X許可證和X-K許可證之間的積分。
總之,向左移動K個permits所需要花費的節流時間等于長度為K的函數的面積。假設RateLimiter的storedPermits飽和狀態,從maxPermits到thresholdPermits就等于warmupPeriod。從thresholdPermits到0的時間是warmupPeriod/2(原因是為了維護以前的實現,其中coldFactor硬編碼為3)
計算thresholdPermits和maxPermits的公式:
- 從thresholdPermits到0花費的節流時間等于函數從0到thresholdPermits的積分,也就是thresholdPermits*stableIntervals。也等于warmupPeriod/2。
thresholdPermits=0.5*warmupPeriod/stableInterval - 從maxPermits到thresholdPermits就是函數從thresholdPermits到maxPermits的積分,也就是梯型部分的面積,它等于0.5(stableInterval+coldInterval)(maxPermits - thresholdPermits),也就是warmupPeriod
maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
Guava RateLimiter源碼
這里源碼先從SmoothBursty入手,首先是RateLimiter類里的創建:
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { // maxBurstSeconds代表這個桶能存儲的令牌數換算的秒數,通過這個秒數就可以知道能存儲的令牌數,也就表示這個桶的大小。 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
我們看到外界無法自定義SmoothBursty的桶大小,所以我們無論是創建什么速率的RateLimiter,桶的大小就必然是rate*1的大小,那么就有人通過反射的方式在滿足自己想要修改桶大小的需求:https://github.com/vipshop/vjtools/commit/9eacb861960df0c41b2323ce14da037a9fdc0629
setRate方法:
public final void setRate(double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); // 需要全局同步 synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } private volatile Object mutexDoNotUseDirectly; // 產生一個同步使用的對象,適應雙重檢查鎖保證這個對象是個單例 private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; }
互斥鎖來保證線程的安全,具體實現代碼中使用volatile修飾的mutexDoNotUseDirectly字段和雙重校驗同步鎖來保證生成單例,從而保證每次調用mutex()都是鎖同一個對象。這在后續的獲取令牌中也需要用到。
doSetRate方法是子類SmoothRateLimiter實現:
// 當前存儲的令牌 double storedPermits; // 最大能夠存儲的令牌 double maxPermits; // 兩次獲取令牌的固定間隔時間 double stableIntervalMicros; // 下一個請求能被授予的時間,一次請求后這個值就會推后,一個大請求推后的時間比一個小請求推后的時間要多 // 這和預消費有關系,上一次消費的令牌 private long nextFreeTicketMicros = 0L; final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); // 計算出請求授予的間隔時間 double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } // 更新storedPermits和nextFreeTicketMicros void resync(long nowMicros) { // 當前時間大于下一個請求時間 說明這次請求不用等待 如果不是就不會出現增加令牌的操作 if (nowMicros > nextFreeTicketMicros) { // 根據上次請求和這次請求間隔計算能夠增加的令牌 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 存儲的令牌不能超過maxPermits storedPermits = min(maxPermits, storedPermits + newPermits); // 修改nextFreeTicketMicros為當前時間 nextFreeTicketMicros = nowMicros; } } // 子類實現 abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); // 返回冷卻期間需要等待獲得新許可證的微秒數。子類實現 abstract double coolDownIntervalMicros();
SmoothBursty的實現
static final class SmoothBursty extends SmoothRateLimiter { /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; // 計算出桶的最大值 maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { // 初始化為0 后續重新設置時按新maxPermits和老maxPermits的比例計算storedPermits storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } // 對于SmoothBursty 沒有什么冷卻時期,所以始終返回的是stableIntervalMicros @Override double coolDownIntervalMicros() { return stableIntervalMicros; } }
然后看下獲取令牌:
public double acquire() { return acquire(1); } public double acquire(int permits) { long microsToWait = reserve(permits); // 等待microsToWait時間 控制速率 stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } // 獲得需要等待的時間 final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } // 子類SmoothRateLimiter實現 abstract long reserveEarliestAvailable(int permits, long nowMicros);
reserveEarliestAvailable方法,刷新下一個請求能被授予的時間
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 每次acquire都會觸發resync resync(nowMicros); // 返回值就是下一個請求能被授予的時間 long returnValue = nextFreeTicketMicros; // 選出請求令牌數和存儲令牌數較小的一個 也就是從存儲的令牌中需要消耗的數量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 計算本次請求中需要行創建的令牌 double freshPermits = requiredPermits - storedPermitsToSpend; // 計算需要等待的時間 就是存儲令牌消耗的時間+新令牌產生需要的時間 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 刷新下一個請求能被授予的時間 是將這次等待的時間加上原先的值 就是把這次請求需要產生的等待時間延遲給下一次請求 這就是一個大請求會馬上授予 但后續的請求會被等待長時間 所以這里的思路核心就是再每次請求時都是在預測下一次請求到來的時間 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 刷新存儲令牌 this.storedPermits -= storedPermitsToSpend; return returnValue; } // 這個是用來計算存儲令牌在消耗時的節流時間 也就是通過這個方法子類可以控存儲令牌的速率 我們看到的SmoothBursty的實現是始終返回0 表示消耗存儲的令牌不需要額外的等待時間 我們在預熱的實現中可以看到不一樣的實現 abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
再來看一下請求令牌方法帶超時時間的方法:
public boolean tryAcquire(long timeout, TimeUnit unit) { return tryAcquire(1, timeout, unit); } public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); } public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); // 判定設置的超時時間是否足夠等待下一個令牌的給予,等不了,就直接失敗 if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { // 獲得需要等待的時間 microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } // 等待 stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; }
以上可以基本理解一個普通的限流器的實現方式,可以看到實現中可以通過doSetRate,storedPermitsToWaitTime,coolDownIntervalMicros方法進行定制自己的限流策略。
那么這里的SmoothBursty的策略是:
- 桶大小通過固定的maxBurstSeconds控制 maxPermits = maxBurstSeconds * permitsPerSecond;
- 消耗累積令牌不計入時間到等待時間中
- 累積令牌時的速率和令牌消耗速率保持一致
我們繼續看稍微復雜點的SmoothWarmingUp,畢竟為了說明它人家作者都用注釋畫了示意圖。
static final class SmoothWarmingUp extends SmoothRateLimiter { // 預熱累計消耗時間 private final long warmupPeriodMicros; /** * The slope of the line from the stable interval (when permits == 0), to the cold interval * (when permits == maxPermits) */ private double slope; private double thresholdPermits; // 冷卻因子 固定是3 意思是通過這個因子可以計算在令牌桶滿的時候,消耗令牌需要的最大時間 private double coldFactor; SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; // 通過coldFactor就可以算出coldInterval的最高點 即stableIntervalMicros的3倍 也就是說圖中的梯形最高點是固定的了 double coldIntervalMicros = stableIntervalMicros * coldFactor; // 根據warmupPeriodMicros*2=thresholdPermits*stableIntervalMicros換算thresholdPermits 因為我們看到梯形最高點是固定的 那么通過設置warmupPeriod是可以控制thresholdPermits,從而控制maxPermits的值 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; // 也是根據上面提到的公式計算maxPermits maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); // 傾斜角度 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits // 這里的初始值是maxPermits : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { // 超過thresholdPermits的存儲令牌 double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // 這里就開始算這個梯形的面積了 梯形面積=(上底+下底)*高/2 double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) micros += (long) (stableIntervalMicros * permitsToTake); return micros; } private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } // 為了確保從0到maxpermit所花費的時間等于預熱時間 可以看下resync方法中對coolDownIntervalMicros方法的使用 @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } }
Guava RateLimiter總結
1,Guava在對RateLimiter設計中采用的是令牌桶的算法,提供了普通和預熱兩種模型,在存儲令牌增加和消耗的設計思路是計算函數積分的方式。
2,對于第一個新來的請求,無論請求多少令牌,不阻塞。
3,內部使用線程sleep方式達到線程等待,沒超時時間會一直等到有令牌
4,令牌存儲發生在請求令牌時,不需要單獨線程實現不斷產生令牌的算法
5,內部設計的類一些參數并不開放外部配置
漏桶
原理如圖:
我們有一個固定的桶,這個桶的出水速率是固定的,流量不斷往桶中放水,進水速度比出水速度快的時候,可以在桶中有一個緩沖,可是到達一定量超出桶的容量,就會溢出桶,無法接受新的請求。
這個思路不就是阻塞隊列嘛,只要在消費的放保持固定速率即可。
實現類似如下代碼(示意使用):
public class LeakyBucketLimiter { BlockingQueue<String> leakyBucket; long rate; public LeakyBucketLimiter(int capacity, long rate) { this.rate = rate; this.leakyBucket = new ArrayBlockingQueue<String>(capacity); } public boolean offer(){ return leakyBucket.offer(""); } class Consumer implements Runnable{ public void run() { try { while (true){ Thread.sleep(1000/rate); leakyBucket.take(); } } catch (InterruptedException e) { } } } }
和令牌桶允許一定突發請求時的高速率,以及空閑后降低速率不同的是,漏桶算法是必然保證速率不變的。
最后
- 限流必然帶來性能損失,如何避免?
一個思路是監控系統的狀態,比如cpu,內存,io等,依據這些信息來開關限流器。
- 實際場景中是單機限流還是分布式限流?
在分布式系統中,如果想用分布式限流,就需要公用存儲的載體,比如redis,zk等。還需要考量分布式存儲載體失效,不能影響正常業務功能。
- 拓展
本文并不是限流的全部,關于限流這里只聊到了相關的一些常規的算法,可以說是冰山一角,還有很多知識等待我們去探索,前路漫漫。
另外,后續會參考開源限流方案 Sentinel 和 Bucket4j 進一步研究實踐限流的落地方案。