一、寫在最前
轟轟烈烈的雙十二已經(jīng)過去小半個月了,程序猿的我坐在辦公桌上思考,雙十二這么大的訪問量,這群電商是怎么扛住的,接口分分鐘會變得不可用,并引發(fā)連鎖反應(yīng)導(dǎo)致整個系統(tǒng)崩潰。好吃懶做的小編,被可怕的好奇心驅(qū)使著去調(diào)研流量控制算法。好奇心害死貓,才有了這篇文章。
二、流量控制算法簡介
流量控制在計算機(jī)領(lǐng)域稱為過載保護(hù)。何為過載保護(hù)?所謂“過載”,即需求超過了負(fù)載能力;而“保護(hù)”則是指當(dāng)“過載”發(fā)生了,采取必要的措施保護(hù)自己不受“傷害”。在計算機(jī)領(lǐng)域,尤其是分布式系統(tǒng)領(lǐng)域,“過載保護(hù)”是一個重要的概念。一個不具備“過載保護(hù)”功能的系統(tǒng),是非常危險和脆弱的,很可能由于瞬間的壓力激增,引起“雪崩效應(yīng)”,導(dǎo)致系統(tǒng)的各個部分都同時崩潰,停止服務(wù)。這就好像在沒有保險絲的保護(hù)下,電壓突然變高,導(dǎo)致所有的電器都會被損壞一樣,“過載保護(hù)”功能是系統(tǒng)的“保險絲”。
如今互聯(lián)網(wǎng)領(lǐng)域,也借鑒了這一思路扛住雙十二, 控制網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)乃俾剩沽髁恳员容^均勻的速度向外發(fā)送。最終實(shí)現(xiàn)優(yōu)化性能,減少延遲和提高帶寬等。
三、常用的限流算法
常用的限流算法有兩種:漏桶算法和令牌桶算法。本篇文章將介紹自己造輪子限流算法、漏桶算法和令牌桶算法。
3.1 自己造輪子限流算法
作為一名小白,我是不愿意自己造輪子的,但是真要早輪子,有一個簡單粗暴的思路:1)設(shè)置單位時間T(如10s)內(nèi)的最大訪問量ReqMax,在單位時間T內(nèi)維護(hù)計數(shù)器Count;2)當(dāng)請求到達(dá)時,判斷時間是否進(jìn)入下一個單位時間;3)如果是,則重置計數(shù)器為0;4)如果不是,計數(shù)器Count++,并判斷計數(shù)器Count是否超過最大訪問量ReqMax,如超過,則拒絕訪問。
long timeStamp = getNowTime();
int reqCount = 0;
const int maxReqCount = 10000;//時間周期內(nèi)最大請求數(shù)
const long effectiveDuration = 10;//時間控制周期
public static bool control(){
long now = getNowTime();
if (now < timeStamp + effectiveDuration){//在時間控制范圍內(nèi)
reqCount++;
return reqCount > maxReqCount;//當(dāng)前時間范圍內(nèi)超過最大請求控制數(shù)
}else{
timeStamp = now;//超時后重置
reqCount = 0;
return true;
}
}
public static int getNowTime(){
long time = System.currentTimeMillis();
return (int) (time/1000);
}
該算法實(shí)現(xiàn)看似確實(shí)完美的實(shí)現(xiàn)了“單位時間內(nèi)最大訪問量控制”,但它在兩個單位時間的臨界值上的處理是有缺陷的。如:設(shè)需要控制的最大請求數(shù)為1w, 在第一個單位時間(0-10s)的最后一秒(即第9s)里達(dá)到的請求數(shù)為1w,接下來第二個單位時間(10-20s)的第一秒(即第10s)里達(dá)到請求數(shù)也是1w,由于超時重置發(fā)生在兩個單位時間之間,所以這2w個請求都將通過控制,也就是說在2s里處理2w個請求,與我們設(shè)置的10s里1w個請求的設(shè)想是相違背。
學(xué)術(shù)一點(diǎn)的說法是該算法處理請求不夠平滑,不能很好的滿足限流需求。
3.2 漏桶算法
漏桶算法思路很簡單,請求先進(jìn)入到漏桶里,漏桶以固定的速度出水,也就是處理請求,當(dāng)水加的過快,則會直接溢出,也就是拒絕請求,可以看出漏桶算法能強(qiáng)行限制數(shù)據(jù)的傳輸速率。

漏桶算法
long timeStamp = getNowTime();
int capacity = 10000;// 桶的容量
int rate = 1;//水漏出的速度
int water = 100;//當(dāng)前水量
public static bool control() {
//先執(zhí)行漏水,因?yàn)閞ate是固定的,所以可以認(rèn)為“時間間隔*rate”即為漏出的水量
long now = getNowTime();
water = Math.max(0, water - (now - timeStamp) * rate);
timeStamp = now;
if (water < capacity) { // 水還未滿,加水
water ++;
return true;
} else {
return false;//水滿,拒絕加水
}
}
該算法很好的解決了時間邊界處理不夠平滑的問題,因?yàn)樵诿看握埱筮M(jìn)桶前都將執(zhí)行“漏水”的操作,再無邊界問題。
但是對于很多場景來說,除了要求能夠限制數(shù)據(jù)的平均傳輸速率外,還要求允許某種程度的突發(fā)傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。
3.3 令牌桶算法
令牌桶算法的原理是系統(tǒng)會以一個恒定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當(dāng)桶里沒有令牌可取時,則拒絕服務(wù)。

令牌桶算法
3.3.1 原理
令牌桶是網(wǎng)絡(luò)設(shè)備的內(nèi)部存儲池,而令牌則是以給定速率填充令牌桶的虛擬信息包。每個到達(dá)的令牌都會從數(shù)據(jù)隊列領(lǐng)出相應(yīng)的數(shù)據(jù)包進(jìn)行發(fā)送,發(fā)送完數(shù)據(jù)后令牌被刪除。
請求注解(RFC)中定義了兩種令牌桶算法——單速率三色標(biāo)記算法和雙速率三色標(biāo)記算法,其評估結(jié)果都是為報文打上紅、黃、綠三色標(biāo)記。QoS會根據(jù)報文的顏色,設(shè)置報文的丟棄優(yōu)先級,其中單速率三色標(biāo)記比較關(guān)心報文尺寸的突發(fā),而雙速率三色標(biāo)記則關(guān)注速率上的突發(fā),兩種算法都可工作于色盲模式和非色盲模式。以下結(jié)合這兩種工作模式介紹一下RFC中所描述的這兩種算法。
1)單速率三色標(biāo)記算法網(wǎng)絡(luò)工程師任務(wù)小組(IETF)的RFC文件定義了單速率三色標(biāo)記算法,評估依據(jù)以下3個參數(shù):承諾訪問速率(CIR),即向令牌桶中填充令牌的速率;承諾突發(fā)尺寸(CBS),即令牌桶的容量,每次突發(fā)所允許的最大流量尺寸(注:設(shè)置的突發(fā)尺寸必須大于最大報文長度);超額突發(fā)尺寸(EBS)。
一般采用雙桶結(jié)構(gòu):C桶和E桶。Tc表示C桶中的令牌數(shù),Te表示E桶中令牌數(shù),兩桶的總?cè)萘糠謩e為CBS和EBS。初始狀態(tài)時兩桶是滿的,即Tc和Te初始值分別等于CBS和EBS。令牌的產(chǎn)生速率是CIR,通常是先往C桶中添加令牌,等C桶滿了,再往E桶中添加令牌,當(dāng)兩桶都被填滿時,新產(chǎn)生的令牌將會被丟棄。
色盲模式下,假設(shè)到達(dá)的報文長度為B。若報文長度B小于C桶中的令牌數(shù)Tc,則報文被標(biāo)記為綠色,且C桶中的令牌數(shù)減少B;若Tc<B <Te,則標(biāo)記為黃色,E和C桶中的令牌數(shù)均減少B;若B >Te,標(biāo)記為紅色,兩桶總令牌數(shù)都不減少。
在非色盲模式下,若報文已被標(biāo)記為綠色或B <Tc,則報文被標(biāo)記為綠色,Tc減少B;若報文已被標(biāo)記為黃色或Tc<B <Te,則標(biāo)記為黃色,且Te減少B;若報文已被標(biāo)記為紅色或B >Te,則標(biāo)記為紅色,Tc和Te都不減少。
2)雙速率三色標(biāo)記算法IETF的RFC文件定義了雙速率三色算法,主要是根據(jù)4種流量參數(shù)來評估:CIR、CBS、峰值信息速率(PIR),峰值突發(fā)尺寸(PBS)。前兩種參數(shù)與單速率三色算法中的含義相同,PIR這個參數(shù)只在交換機(jī)上才有,路由器沒有這個參數(shù)。該值必須不小于CIR的設(shè)置值,如果大于CIR,則速率限制在CIR于PRI之間的一個值。
與單速率三色標(biāo)記算法不同,雙速率三色標(biāo)記算法的兩個令牌桶C桶和P桶填充令牌的速率不同,C桶填充速率為CIR,P桶為PIR;兩桶的容量分別為CBS和PBS。用Tc和Tp表示兩桶中的令牌數(shù)目,初始狀態(tài)時兩桶是滿的,即Tc和Tp初始值分別等于CBS和PBS。
色盲模式下,如果到達(dá)的報文速率大于PIR,超過Tp+Tc部分無法得到令牌,報文被標(biāo)記為紅色,未超過Tp+Tc而從P桶中獲取令牌的報文標(biāo)記為黃色,從C桶中獲取令牌的報文被標(biāo)記為綠色;當(dāng)報文速率小于PIR,大于CIR時,報文不會得不到令牌,但超過Tp部分報文將從P桶中獲取令牌,被標(biāo)記為黃色報文,從C桶中獲取令牌的報文被標(biāo)記為綠色;當(dāng)報文速率小于CIR時,報文所需令牌數(shù)不會超過Tc,只從C桶中獲取令牌,所以只會被標(biāo)記為綠色報文。
在非色盲模式下,如果報文已被標(biāo)記為紅色或者超過Tp+Tc部分無法得到令牌的報文,被標(biāo)記為紅色;如果標(biāo)記為黃色或者超過Tc未超過Tp部分報文記為黃色;如果報文被標(biāo)記為綠或未超過Tc部分報文,被標(biāo)記為綠色。
3.3.2 算法描述與實(shí)現(xiàn)
- 假如用戶配置的平均發(fā)送速率為r,則每隔1/r秒一個令牌被加入到桶中(每秒會有r個令牌放入桶中);
- 假設(shè)桶中最多可以存放b個令牌。如果令牌到達(dá)時令牌桶已經(jīng)滿了,那么這個令牌會被丟棄;
- 當(dāng)一個n個字節(jié)的數(shù)據(jù)包到達(dá)時,就從令牌桶中刪除n個令牌(不同大小的數(shù)據(jù)包,消耗的令牌數(shù)量不一樣),并且數(shù)據(jù)包被發(fā)送到網(wǎng)絡(luò);
- 如果令牌桶中少于n個令牌,那么不會刪除令牌,并且認(rèn)為這個數(shù)據(jù)包在流量限制之外(n個字節(jié),需要n個令牌。該數(shù)據(jù)包將被緩存或丟棄);
- 算法允許最長b個字節(jié)的突發(fā),但從長期運(yùn)行結(jié)果看,數(shù)據(jù)包的速率被限制成常量r。對于在流量限制外的數(shù)據(jù)包可以以不同的方式處理:1)它們可以被丟棄;2)它們可以排放在隊列中以便當(dāng)令牌桶中累積了足夠多的令牌時再傳輸;3)它們可以繼續(xù)發(fā)送,但需要做特殊標(biāo)記,網(wǎng)絡(luò)過載的時候?qū)⑦@些特殊標(biāo)記的包丟棄。
long timeStamp=getNowTime();
int capacity; // 桶的容量
int rate ;//令牌放入速度
int tokens;//當(dāng)前水量
bool control() {
//先執(zhí)行添加令牌的操作
long now = getNowTime();
tokens = max(capacity, tokens+ (now - timeStamp)*rate);
timeStamp = now; //令牌已用完,拒絕訪問
if(tokens<1){
return false;
}else{//還有令牌,領(lǐng)取令牌
tokens--;
retun true;
}
}
令牌桶算法是網(wǎng)絡(luò)流量整形和速率限制中最常使用的一種算法。典型情況下,令牌桶算法用來控制發(fā)送到網(wǎng)絡(luò)上的數(shù)據(jù)的數(shù)目,并允許突發(fā)數(shù)據(jù)的發(fā)送。
大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度,令牌就會不斷地增多,直到把桶填滿。后面再產(chǎn)生的令牌就會從桶中溢出。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會超過桶的大小。
傳送到令牌桶的數(shù)據(jù)包需要消耗令牌。不同大小的數(shù)據(jù)包,消耗的令牌數(shù)量不一樣。令牌桶這種控制機(jī)制基于令牌桶中是否存在令牌來指示什么時候可以發(fā)送流量。令牌桶中的每一個令牌都代表一個字節(jié)。如果令牌桶中存在令牌,則允許發(fā)送流量;而如果令牌桶中不存在令牌,則不允許發(fā)送流量。因此,如果突發(fā)門限被合理地配置并且令牌桶中有足夠的令牌,那么流量就可以以峰值速率發(fā)送。
四、限流工具類RateLimiter
google開源工具包guava提供了限流工具類RateLimiter,該類基于“令牌桶算法”,非常方便使用。RateLimiter經(jīng)常用于限制對一些物理資源或者邏輯資源的訪問速率。它支持兩種獲取permits接口,一種是如果拿不到立刻返回false,一種會阻塞等待一段時間看能不能拿到。
4.1 RateLimiter demo
//多任務(wù)執(zhí)行,但每秒執(zhí)行不超過2個任務(wù)
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}
//以每秒5kb內(nèi)的速度發(fā)送
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
rateLimiter.acquire(packet.length);
networkService.send(packet);
}
//以非阻塞的形式達(dá)到降級
if(limiter.tryAcquire()) { //未請求到limiter則立即返回false
doSomething();
}else{
doSomethingElse();
}
4.2 主要接口
RateLimiter其實(shí)是一個abstract類,但是它提供了幾個static方法用于創(chuàng)建RateLimiter:
/**
* 創(chuàng)建一個穩(wěn)定輸出令牌的RateLimiter,保證了平均每秒不超過permitsPerSecond個請求
* 當(dāng)請求到來的速度超過了permitsPerSecond,保證每秒只處理permitsPerSecond個請求
* 當(dāng)這個RateLimiter使用不足(即請求到來速度小于permitsPerSecond),會囤積最多permitsPerSecond個請求
*/
public static RateLimiter create(double permitsPerSecond);
/**
* 創(chuàng)建一個穩(wěn)定輸出令牌的RateLimiter,保證了平均每秒不超過permitsPerSecond個請求
* 還包含一個熱身期(warmup period),熱身期內(nèi),RateLimiter會平滑的將其釋放令牌的速率加大,直到起達(dá)到最大速率
* 同樣,如果RateLimiter在熱身期沒有足夠的請求(unused),則起速率會逐漸降低到冷卻狀態(tài)
*
* 設(shè)計這個的意圖是為了滿足那種資源提供方需要熱身時間,而不是每次訪問都能提供穩(wěn)定速率的服務(wù)的情況(比如帶緩存服務(wù),需要定期刷新緩存的)
* 參數(shù)warmupPeriod和unit決定了其從冷卻狀態(tài)到達(dá)最大速率的時間
*/
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);
提供了兩個獲取令牌的方法,不帶參數(shù)表示獲取一個令牌。如果沒有令牌則一直等待,返回等待的時間(單位為秒),沒有被限流則直接返回0.0:
public double acquire();
public double acquire(int permits);
嘗試獲取令牌,分為待超時時間和不帶超時時間兩種:
public boolean tryAcquire();
//嘗試獲取一個令牌,立即返回
public boolean tryAcquire(int permits);
public boolean tryAcquire(long timeout, TimeUnit unit);
//嘗試獲取permits個令牌,帶超時時間
public boolean tryAcquire(int permits, long timeout, TimeUnit unit);
4.3 RateLimiter的設(shè)計
RateLimiter的主要功能就是提供一個穩(wěn)定的速率,實(shí)現(xiàn)方式就是通過限制請求流入的速度,比如計算請求等待合適的時間閾值。
實(shí)現(xiàn)QPS速率的最簡單的方式就是記住上一次請求的最后授權(quán)時間,然后保證1/QPS秒內(nèi)不允許請求進(jìn)入。比如QPS=5,如果我們保證最后一個被授權(quán)請求之后的200ms的時間內(nèi)沒有請求被授權(quán),那么我們就達(dá)到了預(yù)期的速率。如果一個請求現(xiàn)在過來但是最后一個被授權(quán)請求是在100ms之前,那么我們就要求當(dāng)前這個請求等待100ms。按照這個思路,請求15個新令牌(許可證)就需要3秒。
有一點(diǎn)很重要:上面這個設(shè)計思路的RateLimiter記憶非常的淺,它的腦容量非常的小,只記得上一次被授權(quán)的請求的時間。如果RateLimiter的一個被授權(quán)請求q之前很長一段時間沒有被使用會怎么樣?這個RateLimiter會立馬忘記過去這一段時間的利用不足,而只記得剛剛的請求q。
過去一段時間的利用不足意味著有過剩的資源是可以利用的。這種情況下,RateLimiter應(yīng)該加把勁(speed up for a while)將這些過剩的資源利用起來。比如在向網(wǎng)絡(luò)中發(fā)生數(shù)據(jù)的場景(限流),過去一段時間的利用不足可能意味著網(wǎng)卡緩沖區(qū)是空的,這種場景下,我們是可以加速發(fā)送來將這些過程的資源利用起來。
另一方面,過去一段時間的利用不足可能意味著處理請求的服務(wù)器對即將到來的請求是準(zhǔn)備不足的(less ready for future requests),比如因?yàn)楹荛L一段時間沒有請求當(dāng)前服務(wù)器的cache是陳舊的,進(jìn)而導(dǎo)致即將到來的請求會觸發(fā)一個昂貴的操作(比如重新刷新全量的緩存)。
為了處理這種情況,RateLimiter中增加了一個維度的信息,就是過去一段時間的利用不足(past underutilization),代碼中使用storedPermits變量表示。當(dāng)沒有利用不足這個變量為0,最大能達(dá)到maxStoredPermits(maxStoredPermits表示完全沒有利用)。因此,請求的令牌可能從兩個地方來:
過去剩余的令牌(stored permits, 可能沒有)
現(xiàn)有的令牌(fresh permits,當(dāng)前這段時間還沒用完的令牌)
我們將通過一個例子來解釋它是如何工作的:對一個每秒產(chǎn)生一個令牌的RateLimiter,每有一個沒有使用令牌的一秒,我們就將storedPermits加1,如果RateLimiter在10秒都沒有使用,則storedPermits變成10.0。這個時候,一個請求到來并請求三個令牌(acquire(3)),我們將從storedPermits中的令牌為其服務(wù),storedPermits變?yōu)?.0。這個請求之后立馬又有一個請求到來并請求10個令牌,我們將從storedPermits剩余的7個令牌給這個請求,剩下還需要三個令牌,我們將從RateLimiter新產(chǎn)生的令牌中獲取。我們已經(jīng)知道,RateLimiter每秒新產(chǎn)生1個令牌,就是說上面這個請求還需要的3個請求就要求其等待3秒。
想象一個RateLimiter每秒產(chǎn)生一個令牌,現(xiàn)在完全沒有使用(處于初始狀態(tài)),限制一個昂貴的請求acquire(100)過來。如果我們選擇讓這個請求等待100秒再允許其執(zhí)行,這顯然很荒謬。我們?yōu)槭裁词裁匆膊蛔龆皇巧瞪档牡却?00秒,一個更好的做法是允許這個請求立即執(zhí)行(和acquire(1)沒有區(qū)別),然后將隨后到來的請求推遲到正確的時間點(diǎn)。這種策略,我們允許這個昂貴的任務(wù)立即執(zhí)行,并將隨后到來的請求推遲100秒。這種策略就是讓任務(wù)的執(zhí)行和等待同時進(jìn)行。
一個重要的結(jié)論:RateLimiter不會記最后一個請求,而是即下一個請求允許執(zhí)行的時間。這也可以很直白的告訴我們到達(dá)下一個調(diào)度時間點(diǎn)的時間間隔。然后定一個一段時間未使用的Ratelimiter也很簡單:下一個調(diào)度時間點(diǎn)已經(jīng)過去,這個時間點(diǎn)和現(xiàn)在時間的差就是Ratelimiter多久沒有被使用,我們會將這一段時間翻譯成storedPermits。所有,如果每秒鐘產(chǎn)生一個令牌(rate==1),并且正好每秒來一個請求,那么storedPermits就不會增長。
4.4 主要碼源
分析一下RateLimiter如何實(shí)現(xiàn)限流:
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) { //應(yīng)對并發(fā)情況需要同步
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
下面方法來自RateLimiter的具體實(shí)現(xiàn)類SmoothRateLimiter:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); //補(bǔ)充令牌
long returnValue = nextFreeTicketMicros;
//這次請求消耗的令牌數(shù)目
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
private void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
storedPermits = min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
nextFreeTicketMicros = nowMicros;
}
}
另外,對于storedPermits的使用,RateLimiter存在兩種策略,二者區(qū)別主要體現(xiàn)在使用storedPermits時候需要等待的時間。這個邏輯由storedPermitsToWaitTime函數(shù)實(shí)現(xiàn):
/**
* Translates a specified portion of our currently stored permits which we want to
* spend/acquire, into a throttling time. Conceptually, this evaluates the integral
* of the underlying function we use, for the range of
* [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
存在兩種策略就是為了應(yīng)對我們上面講到的,存在資源使用不足大致分為兩種情況:
(1)資源確實(shí)使用不足,這些剩余的資源我們私海可以使用的;
(2)提供資源的服務(wù)過去還沒準(zhǔn)備好,比如服務(wù)剛啟動等。
為此,RateLimiter實(shí)際上由兩種實(shí)現(xiàn)策略,其實(shí)現(xiàn)分別見SmoothBursty和SmoothWarmingUp。二者主要的區(qū)別就是storedPermitsToWaitTime實(shí)現(xiàn)以及maxPermits數(shù)量的計算。
4.4.1 SmoothBursty
SmoothBursty使用storedPermits不需要額外等待時間。并且默認(rèn)maxBurstSeconds未1,因此maxPermits為permitsPerSecond,即最多可以存儲1秒的剩余令牌,比如QPS=5,則maxPermits=5。
下面這個RateLimiter的入口就是用來創(chuàng)建SmoothBursty類型的RateLimiter:
public static RateLimiter create(double permitsPerSecond)
/**
* This implements a "bursty" RateLimiter, where storedPermits are translated to
* zero throttling. The maximum number of permits that can be saved (when the RateLimiter is
* unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this
* time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.
*/
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
System.out.println("maxPermits=" + maxPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits = (oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
}
一個簡單的使用示意圖及解釋,下面私海一個QPS=4的SmoothBursty:
(1)t=0,這時候storedPermits=0,請求1個令牌,等待時間=0;
(2)t=1,這時候storedPermits=3,請求3個令牌,等待時間=0;
(3)t=2,這時候storedPermits=4,請求10個令牌,等待時間=0,超前使用了2個令牌;
(4)t=3,這時候storedPermits=0,請求1個令牌,等待時間=0.5。
代碼的輸出:
maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472
acquire(1), sleepSecond=0.0
maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345
acquire(3), sleepSecond=0.0
maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668
acquire(10), sleepSecond=0.0
maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668
acquire(1), sleepSecond=0.499591

4.4.2 SmoothWarmingUp
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;
private double halfPermits;
SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit) {
super(stopwatch);
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
maxPermits = warmupPeriodMicros / stableIntervalMicros;
halfPermits = maxPermits / 2.0;
// Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate
double coldIntervalMicros = stableIntervalMicros * 3.0;
slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits = (oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveHalf = storedPermits - halfPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveHalf > 0.0) {
double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);
micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
+ permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
permitsToTake -= permitsAboveHalfToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (stableIntervalMicros * permitsToTake);
return micros;
}
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
}
maxPermits等于熱身(warmup)期間能產(chǎn)生的令牌數(shù),比如QPS=4,warmup為2秒,則maxPermits=8。halfPermits為maxPermits的一半。
參考注釋中的神圖:
* ^ throttling
* |
* 3*stable + /
* interval | /.
* (cold) | / .
* | / . <-- "warmup period" is the area of the trapezoid between
* 2*stable + / . halfPermits and maxPermits
* interval | / .
* | / .
* | / .
* stable +----------/ WARM . }
* interval | . UP . } <-- this rectangle (from 0 to maxPermits, and
* | . PERIOD. } height == stableInterval) defines the cooldown period,
* | . . } and we want cooldownPeriod == warmupPeriod
* |---------------------------------> storedPermits
* (halfPermits) (maxPermits)
*
下面是我們QPS=4,warmup為2秒時候?qū)?yīng)的圖。

maxPermits=8,halfPermits=4,和SmoothBursty相同的請求序列:
(1)t=0,這時候storedPermits=8,請求1個令牌,使用1個storedPermits消耗時間=1×(0.75+0.625)/2=0.6875秒;
(2)t=1,這時候storedPermits=8,請求3個令牌,使用3個storedPermits消耗時間=3×(0.75+0.375)/2=1.6875秒(注意已經(jīng)超過1秒了,意味著下次產(chǎn)生新Permit時間為2.6875);
(3)t=2,這時候storedPermits=5,請求10個令牌,使用5個storedPermits消耗時間=1×(0.375+0.25)/2+4*0.25=1.3125秒,再加上額外請求的5個新產(chǎn)生的Permit需要消耗=5*0.25=1.25秒,即總共需要耗時2.5625秒,則下一次產(chǎn)生新的Permit時間為2.6875+2.5625=5.25,注意當(dāng)前請求私海2.6875才返回的,之前一直阻塞;
(4)t=3,因?yàn)榍耙粋€請求阻塞到2.6875,實(shí)際這個請求3.6875才到達(dá)RateLimiter,請求1個令牌,storedPermits=0,下一次產(chǎn)生新Permit時間為5.25,因此總共需要等待5.25-3.6875=1.5625秒。
實(shí)際執(zhí)行結(jié)果:
warmupPeriodMicros=2000000
stableIntervalMicros=250000.0, maxPermits=8.0, halfPermits=4.0, coldIntervalMicros=750000.0, slope=125000.0, storedPermits=8.0
maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1524
acquire(1), sleepSecond=0.0
maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1001946
acquire(3), sleepSecond=0.0
maxPermits=8.0, storedPermits=5.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2689446
acquire(10), sleepSecond=0.687186
maxPermits=8.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=5251946
acquire(1), sleepSecond=1.559174
五、Guava并發(fā):ListenableFuture與RateLimiter示例
5.1概念
ListenableFuture顧名思義就是可以監(jiān)聽的Future,它是對JAVA原生Future的擴(kuò)展增強(qiáng)。我們知道Future表示一個異步計算任務(wù),當(dāng)任務(wù)完成時可以得到計算結(jié)果。如果我們希望一旦計算完成就拿到結(jié)果展示給用戶或者做另外的計算,就必須使用另一個線程不斷的查詢計算狀態(tài)。這樣做,代碼復(fù)雜,而且效率低下。使用ListenableFuture Guava幫我們檢測Future是否完成了,如果完成就自動調(diào)用回調(diào)函數(shù),這樣可以減少并發(fā)程序的復(fù)雜度。
推薦使用第二種方法,因?yàn)榈诙N方法可以直接得到Future的返回值,或者處理錯誤情況。本質(zhì)上第二種方法是通過調(diào)動第一種方法實(shí)現(xiàn)的,做了進(jìn)一步的封裝。
另外ListenableFuture還有其他幾種內(nèi)置實(shí)現(xiàn):
1)SettableFuture:不需要實(shí)現(xiàn)一個方法來計算返回值,而只需要返回一個固定值來做為返回值,可以通過程序設(shè)置此Future的返回值或者異常信息;2)CheckedFuture:這是一個繼承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future執(zhí)行發(fā)生異常時,可以拋出指定類型的異常。
RateLimiter類似于JDK的信號量Semphore,他用來限制對資源并發(fā)訪問的線程數(shù)
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
public class ListenableFutureDemo {
public static void main(String[] args) {
testRateLimiter();
testListenableFuture();
}
/**
* RateLimiter類似于JDK的信號量Semphore,他用來限制對資源并發(fā)訪問的線程數(shù)
*/
public static void testRateLimiter() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超過4個任務(wù)被提交
for (int i = 0; i < 10; i++) {
limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞
final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("is "+ i));
}
}
public static void testListenableFuture() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("testListenableFuture"));
//同步獲取調(diào)用結(jié)果
try {
System.out.println(listenableFuture.get());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
}
//第一種方式
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
try {
System.out.println("get listenable future's result "
+ listenableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}, executorService);
//第二種方式
Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out
.println("get listenable future's result with callback "
+ result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
}
}
class Task implements Callable<Integer> {
String str;
public Task(String str){
this.str = str;
}
@Override
public Integer call() throws Exception {
System.out.println("call execute.." + str);
TimeUnit.SECONDS.sleep(1);
return 7;
}
}
pom.xml依賴
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>