在開(kāi)發(fā)分布式高并發(fā)系統(tǒng)時(shí)有三把利器用來(lái)保護(hù)系統(tǒng):緩存、降級(jí)、限流
緩存
緩存的目的是提升系統(tǒng)訪問(wèn)速度和增大系統(tǒng)處理容量
降級(jí)
降級(jí)是當(dāng)服務(wù)出現(xiàn)問(wèn)題或者影響到核心流程時(shí),需要暫時(shí)屏蔽掉,待高峰或者問(wèn)題解決后再打開(kāi)
限流
限流的目的是通過(guò)對(duì)并發(fā)訪問(wèn)/請(qǐng)求進(jìn)行限速,或者對(duì)一個(gè)時(shí)間窗口內(nèi)的請(qǐng)求進(jìn)行限速來(lái)保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)、排隊(duì)或等待、降級(jí)等處理
問(wèn)題描述
1、某天A君突然發(fā)現(xiàn)自己的接口請(qǐng)求量突然漲到之前的10倍,沒(méi)多久該接口幾乎不可使用,并引發(fā)連鎖反應(yīng)導(dǎo)致整個(gè)系統(tǒng)崩潰。如何應(yīng)對(duì)這種情況呢?生活給了我們答案:比如老式電閘都安裝了保險(xiǎn)絲,一旦有人使用超大功率的設(shè)備,保險(xiǎn)絲就會(huì)燒斷以保護(hù)各個(gè)電器不被強(qiáng)電流給燒壞。同理我們的接口也需要安裝上“保險(xiǎn)絲”,以防止非預(yù)期的請(qǐng)求對(duì)系統(tǒng)壓力過(guò)大而引起的系統(tǒng)癱瘓,當(dāng)流量過(guò)大時(shí),可以采取拒絕或者引流等機(jī)制。整編:微信公眾號(hào),搜云庫(kù)技術(shù)團(tuán)隊(duì),ID:souyunku
2、緩存的目的是提升系統(tǒng)訪問(wèn)速度和增大系統(tǒng)能處理的容量,可謂是抗高并發(fā)流量的銀彈;而降級(jí)是當(dāng)服務(wù)出問(wèn)題或者影響到核心流程的性能則需要暫時(shí)屏蔽掉,待高峰或者問(wèn)題解決后再打開(kāi);而有些場(chǎng)景并不能用緩存和降級(jí)來(lái)解決,比如稀缺資源(秒殺、搶購(gòu))、寫服務(wù)(如評(píng)論、下單)、頻繁的復(fù)雜查詢(評(píng)論的最后幾頁(yè)),因此需有一種手段來(lái)限制這些場(chǎng)景的并發(fā)/請(qǐng)求量,即限流。
3、系統(tǒng)在設(shè)計(jì)之初就會(huì)有一個(gè)預(yù)估容量,長(zhǎng)時(shí)間超過(guò)系統(tǒng)能承受的TPS/QPS閾值,系統(tǒng)可能會(huì)被壓垮,最終導(dǎo)致整個(gè)服務(wù)不夠用。為了避免這種情況,我們就需要對(duì)接口請(qǐng)求進(jìn)行限流。
4、限流的目的是通過(guò)對(duì)并發(fā)訪問(wèn)請(qǐng)求進(jìn)行限速或者一個(gè)時(shí)間窗口內(nèi)的的請(qǐng)求數(shù)量進(jìn)行限速來(lái)保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)、排隊(duì)或等待。
5、一般開(kāi)發(fā)高并發(fā)系統(tǒng)常見(jiàn)的限流模式有控制并發(fā)和控制速率,一個(gè)是限制并發(fā)的總數(shù)量(比如數(shù)據(jù)庫(kù)連接池、線程池),一個(gè)是限制并發(fā)訪問(wèn)的速率(如Nginx的limitconn模塊,用來(lái)限制瞬時(shí)并發(fā)連接數(shù)),另外還可以限制單位時(shí)間窗口內(nèi)的請(qǐng)求數(shù)量(如Guava的RateLimiter、nginx的limitreq模塊,限制每秒的平均速率)。其他還有如限制遠(yuǎn)程接口調(diào)用速率、限制MQ的消費(fèi)速率。另外還可以根據(jù)網(wǎng)絡(luò)連接數(shù)、網(wǎng)絡(luò)流量、CPU或內(nèi)存負(fù)載等來(lái)限流。
相關(guān)概念:
PV:
page view 頁(yè)面總訪問(wèn)量,每刷新一次記錄一次。
UV:
unique view 客戶端主機(jī)訪問(wèn),指一天內(nèi)相同IP的訪問(wèn)記為1次。
QPS:
query per second,即每秒訪問(wèn)量。qps很大程度上代表了系統(tǒng)的繁忙度,沒(méi)次請(qǐng)求可能存在多次的磁盤io,網(wǎng)絡(luò)請(qǐng)求,多個(gè)cpu時(shí)間片,一旦qps超過(guò)了預(yù)先設(shè)置的閥值,可以考量擴(kuò)容增加服務(wù)器,避免訪問(wèn)量過(guò)大導(dǎo)致的宕機(jī)。整編:微信公眾號(hào),搜云庫(kù)技術(shù)團(tuán)隊(duì),ID:souyunku
RT:
response time,每次請(qǐng)求的響應(yīng)時(shí)間,直接決定用戶體驗(yàn)性。
本文主要介紹應(yīng)用級(jí)限流方法,分布式限流、流量入口限流(接入層如NGINX limitconn和limitreq 模塊)。
應(yīng)用級(jí)限流
一、控制并發(fā)數(shù)量
屬于一種較常見(jiàn)的限流手段,在實(shí)際應(yīng)用中可以通過(guò)信號(hào)量機(jī)制(如JAVA中的Semaphore)來(lái)實(shí)現(xiàn)。操作系統(tǒng)的信號(hào)量是個(gè)很重要的概念,Java 并發(fā)庫(kù) 的Semaphore 可以很輕松完成信號(hào)量控制,Semaphore可以控制某個(gè)資源可被同時(shí)訪問(wèn)的個(gè)數(shù),通過(guò) acquire() 獲取一個(gè)許可,如果沒(méi)有就等待,而 release() 釋放一個(gè)許可。
舉個(gè)例子,我們對(duì)外提供一個(gè)服務(wù)接口,允許最大并發(fā)數(shù)為10,代碼實(shí)現(xiàn)如下:
public class DubboService { private final Semaphore permit = new Semaphore(10, true); public void process(){ try{ permit.acquire(); //業(yè)務(wù)邏輯處理 } catch (InterruptedException e) { e.printStackTrace(); } finally { permit.release(); } }}
在以上代碼中,雖然有30個(gè)線程在執(zhí)行,但是只允許10個(gè)并發(fā)的執(zhí)行。Semaphore的構(gòu)造方法Semaphore(int permits) 接受一個(gè)整型的數(shù)字,表示可用的許可證數(shù)量。Semaphore(10)表示允許10個(gè)線程獲取許可證,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡(jiǎn)單,首先線程使用Semaphore的acquire()獲取一個(gè)許可證,使用完之后調(diào)用release()歸還許可證,還可以用tryAcquire()方法嘗試獲取許可證,信號(hào)量的本質(zhì)是控制某個(gè)資源可被同時(shí)訪問(wèn)的個(gè)數(shù),在一定程度上可以控制某資源的訪問(wèn)頻率,但不能精確控制,控制訪問(wèn)頻率的模式見(jiàn)下文描述。
二、控制訪問(wèn)速率
在工程實(shí)踐中,常見(jiàn)的是使用令牌桶算法來(lái)實(shí)現(xiàn)這種模式,常用的限流算法有兩種:漏桶算法和令牌桶算法。
漏桶算法
漏桶算法思路很簡(jiǎn)單,水(請(qǐng)求)先進(jìn)入到漏桶里,漏桶以一定的速度出水,當(dāng)水流入速度過(guò)大會(huì)直接溢出,可以看出漏桶算法能強(qiáng)行限制數(shù)據(jù)的傳輸速率。
對(duì)于很多應(yīng)用場(chǎng)景來(lái)說(shuō),除了要求能夠限制數(shù)據(jù)的平均傳輸速率外,還要求允許某種程度的突發(fā)傳輸。這時(shí)候漏桶算法可能就不合適了,令牌桶算法更為適合。
令牌桶算法
如圖所示,令牌桶算法的原理是系統(tǒng)會(huì)以一個(gè)恒定的速度往桶里放入令牌,而如果請(qǐng)求需要被處理,則需要先從桶里獲取一個(gè)令牌,當(dāng)桶里沒(méi)有令牌可取時(shí),則拒絕服務(wù),令牌桶算法通過(guò)發(fā)放令牌,根據(jù)令牌的rate頻率做請(qǐng)求頻率限制,容量限制等。整編:微信公眾號(hào),搜云庫(kù)技術(shù)團(tuán)隊(duì),ID:souyunku
在Wikipedia上,令牌桶算法是這么描述的:
1、每過(guò)1/r秒桶中增加一個(gè)令牌。
2、桶中最多存放b個(gè)令牌,如果桶滿了,新放入的令牌會(huì)被丟棄。
3、當(dāng)一個(gè)n字節(jié)的數(shù)據(jù)包到達(dá)時(shí),消耗n個(gè)令牌,然后發(fā)送該數(shù)據(jù)包。
4、如果桶中可用令牌小于n,則該數(shù)據(jù)包將被緩存或丟棄。
令牌桶控制的是一個(gè)時(shí)間窗口內(nèi)通過(guò)的數(shù)據(jù)量,在API層面我們常說(shuō)的QPS、TPS,正好是一個(gè)時(shí)間窗口內(nèi)的請(qǐng)求量或者事務(wù)量,只不過(guò)時(shí)間窗口限定在1s罷了。以一個(gè)恒定的速度往桶里放入令牌,而如果請(qǐng)求需要被處理,則需要先從桶里獲取一個(gè)令牌,當(dāng)桶里沒(méi)有令牌可取時(shí),則拒絕服務(wù)。令牌桶的另外一個(gè)好處是可以方便的改變速度,一旦需要提高速率,則按需提高放入桶中的令牌的速率。
在我們的工程實(shí)踐中,通常使用google開(kāi)源工具包Guava提供的限流工具類RateLimiter來(lái)實(shí)現(xiàn)控制速率,該類基于令牌桶算法來(lái)完成限流,非常易于使用,而且非常高效。如我們不希望每秒的任務(wù)提交超過(guò)1個(gè)
public static void main(String[] args) { String start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); RateLimiter limiter = RateLimiter.create(1.0); // 這里的1表示每秒允許處理的量為1個(gè) for (int i = 1; i <= 10; i++) { double waitTime = limiter.acquire(i); // 請(qǐng)求RateLimiter, 超過(guò)permits會(huì)被阻塞 System.out.println("cutTime=" + System.currentTimeMillis() + " call execute:" + i + " waitTime:" + waitTime); } String end = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); System.out.println("start time:" + start); System.out.println("end time:" + end);}
首先通過(guò)RateLimiter.create(1.0);創(chuàng)建一個(gè)限流器,參數(shù)代表每秒生成的令牌數(shù),通過(guò)limiter.acquire(i);來(lái)以阻塞的方式獲取令牌,令牌桶算法允許一定程度的突發(fā)(允許消費(fèi)未來(lái)的令牌),所以可以一次性消費(fèi)i個(gè)令牌;當(dāng)然也可以通過(guò)tryAcquire(int permits, long timeout, TimeUnit unit)來(lái)設(shè)置等待超時(shí)時(shí)間的方式獲取令牌,如果超timeout為0,則代表非阻塞,獲取不到立即返回,支持阻塞或可超時(shí)的令牌消費(fèi)。
從輸出來(lái)看,RateLimiter支持預(yù)消費(fèi),比如在acquire(5)時(shí),等待時(shí)間是4秒,是上一個(gè)獲取令牌時(shí)預(yù)消費(fèi)了3個(gè)兩排,固需要等待3*1秒,然后又預(yù)消費(fèi)了5個(gè)令牌,以此類推。
RateLimiter通過(guò)限制后面請(qǐng)求的等待時(shí)間,來(lái)支持一定程度的突發(fā)請(qǐng)求(預(yù)消費(fèi)),在使用過(guò)程中需要注意這一點(diǎn),Guava有兩種限流模式,一種為穩(wěn)定模式(SmoothBursty:令牌生成速度恒定,平滑突發(fā)限流),一種為漸進(jìn)模式(SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個(gè)穩(wěn)定值,平滑預(yù)熱限流) 兩種模式實(shí)現(xiàn)思路類似,主要區(qū)別在等待時(shí)間的計(jì)算上。
SmoothBursty 模式:
RateLimiter limiter = RateLimiter.create(5); RateLimiter.create(5)表示桶容量為5且每秒新增5個(gè)令牌,即每隔200毫秒新增一個(gè)令牌;limiter.acquire()表示消費(fèi)一個(gè)令牌,如果當(dāng)前桶中有足夠令牌則成功(返回值為0),如果桶中沒(méi)有令牌則暫停一段時(shí)間,比如發(fā)令牌間隔是200毫秒,則等待200毫秒后再去消費(fèi)令牌,這種實(shí)現(xiàn)將突發(fā)請(qǐng)求速率平均為了固定請(qǐng)求速率。
SmoothWarmingUp模式:
RateLimiter limiter = RateLimiter.create(5,1000, TimeUnit.MILLISECONDS);
創(chuàng)建方式:
RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit),permitsPerSecond表示每秒新增的令牌數(shù),warmupPeriod表示在從冷啟動(dòng)速率過(guò)渡到平均速率的時(shí)間間隔。速率是梯形上升速率的,也就是說(shuō)冷啟動(dòng)時(shí)會(huì)以一個(gè)比較大的速率慢慢到平均速率;然后趨于平均速率(梯形下降到平均速率)??梢酝ㄟ^(guò)調(diào)節(jié)warmupPeriod參數(shù)實(shí)現(xiàn)一開(kāi)始就是平滑固定速率。整編:微信公眾號(hào),搜云庫(kù)技術(shù)團(tuán)隊(duì),ID:souyunku
放在Controller中用Jemter壓測(cè)
注:RateLimiter控制的是速率,Samephore控制的是并發(fā)量。RateLimiter的原理就是令牌桶,它主要由許可發(fā)出的速率來(lái)定義,如果沒(méi)有額外的配置,許可證將按每秒許可證規(guī)定的固定速度分配,許可將被平滑地分發(fā),若請(qǐng)求超過(guò)permitsPerSecond則RateLimiter按照每秒 1/permitsPerSecond 的速率釋放許可。注意:RateLimiter適用于單體應(yīng)用,且RateLimiter不保證公平性訪問(wèn)。
使用上述方式使用RateLimiter的方式不夠優(yōu)雅,自定義注解+AOP的方式實(shí)現(xiàn)(適用于單體應(yīng)用),詳細(xì)見(jiàn)下面代碼:
自定義注解:
import java.lang.annotation.*;/** * 自定義注解可以不包含屬性,成為一個(gè)標(biāo)識(shí)注解 */@Inherited@Documented@Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface RateLimitAspect {}
自定義切面類
import com.google.common.util.concurrent.RateLimiter;import com.test.cn.springbootdemo.util.ResultUtil;import net.sf.json.JSONObject;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.annotation.Pointcut;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Scope;import org.springframework.stereotype.Component;import javax.servlet.ServletOutputStream;import javax.servlet.http.HttpServletResponse;import java.io.IOException;@Component@Scope@Aspectpublic class RateLimitAop { @Autowired private HttpServletResponse response; private RateLimiter rateLimiter = RateLimiter.create(5.0); //比如說(shuō),我這里設(shè)置"并發(fā)數(shù)"為5 @Pointcut("@annotation(com.test.cn.springbootdemo.aspect.RateLimitAspect)") public void serviceLimit() { } @Around("serviceLimit()") public Object around(ProceedingJoinPoint joinPoint) { Boolean flag = rateLimiter.tryAcquire(); Object obj = null; try { if (flag) { obj = joinPoint.proceed(); }else{ String result = JSONObject.fromObject(ResultUtil.success1(100, "failure")).toString(); output(response, result); } } catch (Throwable e) { e.printStackTrace(); } System.out.println("flag=" + flag + ",obj=" + obj); return obj; } public void output(HttpServletResponse response, String msg) throws IOException { response.setContentType("Application/json;charset=UTF-8"); ServletOutputStream outputStream = null; try { outputStream = response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); } catch (IOException e) { e.printStackTrace(); } finally { outputStream.flush(); outputStream.close(); } }}
測(cè)試controller
import com.test.cn.springbootdemo.aspect.RateLimitAspect;import com.test.cn.springbootdemo.util.ResultUtil;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;@Controllerpublic class TestController { @ResponseBody @RateLimitAspect @RequestMapping("/test") public String test(){ return ResultUtil.success1(1001, "success").toString(); }
壓測(cè)結(jié)果:
三、控制單位時(shí)間窗口內(nèi)請(qǐng)求數(shù)
某些場(chǎng)景下,我們想限制某個(gè)接口或服務(wù) 每秒/每分鐘/每天 的請(qǐng)求次數(shù)或調(diào)用次數(shù)。例如限制服務(wù)每秒的調(diào)用次數(shù)為50,實(shí)現(xiàn)如下:
private LoadingCache < Long, AtomicLong > counter = CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build(new CacheLoader < Long, AtomicLong > () {@ Override public AtomicLong load(Long seconds) throws Exception { return new AtomicLong(0); }});public static long permit = 50;public ResponseEntity getData() throws ExecutionException { //得到當(dāng)前秒 long currentSeconds = System.currentTimeMillis() / 1000; if (counter.get(currentSeconds).incrementAndGet() > permit) { return ResponseEntity.builder().code(404).msg("訪問(wèn)速率過(guò)快").build(); } //業(yè)務(wù)處理}
到此應(yīng)用級(jí)限流的一些方法就介紹完了。假設(shè)將應(yīng)用部署到多臺(tái)機(jī)器,應(yīng)用級(jí)限流方式只是單應(yīng)用內(nèi)的請(qǐng)求限流,不能進(jìn)行全局限流。因此我們需要分布式限流和接入層限流來(lái)解決這個(gè)問(wèn)題。
分布式限流
自定義注解+攔截器+redis實(shí)現(xiàn)限流 (單體和分布式均適用,全局限流)
自定義注解:
@Inherited@Documented@Target({ElementType.FIELD,ElementType.TYPE,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public @interface AccessLimit { int limit() default 5; int sec() default 5;}
攔截器:
public class AccessLimitInterceptor implements HandlerInterceptor { @Autowired private RedisTemplate<String, Integer> redisTemplate; //使用RedisTemplate操作redis @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (handler instanceof HandlerMethod) { HandlerMethod handlerMethod = (HandlerMethod) handler; Method method = handlerMethod.getMethod(); if (!method.isAnnotationPresent(AccessLimit.class)) { return true; } AccessLimit accessLimit = method.getAnnotation(AccessLimit.class); if (accessLimit == null) { return true; } int limit = accessLimit.limit(); int sec = accessLimit.sec(); String key = IPUtil.getIpAddr(request) + request.getRequestURI(); Integer maxLimit = redisTemplate.opsForValue().get(key); if (maxLimit == null) { redisTemplate.opsForValue().set(key, 1, sec, TimeUnit.SECONDS); //set時(shí)一定要加過(guò)期時(shí)間 } else if (maxLimit < limit) { redisTemplate.opsForValue().set(key, maxLimit + 1, sec, TimeUnit.SECONDS); } else { output(response, "請(qǐng)求太頻繁!"); return false; } } return true; } public void output(HttpServletResponse response, String msg) throws IOException { response.setContentType("application/json;charset=UTF-8"); ServletOutputStream outputStream = null; try { outputStream = response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); } catch (IOException e) { e.printStackTrace(); } finally { outputStream.flush(); outputStream.close(); } } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { }}
controller:
@Controller@RequestMapping("/activity")public class AopController { @ResponseBody @RequestMapping("/seckill") @AccessLimit(limit = 4,sec = 10) //加上自定義注解即可 public String test (HttpServletRequest request,@RequestParam(value = "username",required = false) String userName){ //TODO somethings…… return "hello world !"; }}
配置文件:
/*springmvc的配置文件中加入自定義攔截器*/<mvc:interceptors> <mvc:interceptor> <mvc:mapping path="/**"/> <bean class="com.pptv.activityapi.controller.pointsmall.AccessLimitInterceptor"/> </mvc:interceptor></mvc:interceptors>
訪問(wèn)效果如下,10s內(nèi)訪問(wèn)接口超過(guò)4次以上就過(guò)濾請(qǐng)求,原理和計(jì)數(shù)器算法類似:
接入層限流
主要介紹nginx 限流,采用漏桶算法。
限制原理:可一句話概括為:“根據(jù)客戶端特征,限制其訪問(wèn)頻率”,客戶端特征主要指IP、UserAgent等。使用IP比UserAgent更可靠,因?yàn)镮P無(wú)法造假,UserAgent可隨意偽造。整編:微信公眾號(hào),搜云庫(kù)技術(shù)團(tuán)隊(duì),ID:souyunku
用limit_req模塊來(lái)限制基于IP請(qǐng)求的訪問(wèn)頻率:
http://nginx.org/en/docs/http/ngxhttplimitreqmodule.html
也可以用tengine中的增強(qiáng)版:
http://tengine.taobao.org/documentcn/httplimitreqcn.html
1、并發(fā)數(shù)和連接數(shù)控制的配置:
nginx http配置: #請(qǐng)求數(shù)量控制,每秒20個(gè) limit_req_zone $binary_remote_addr zone=one:10m rate=20r/s; #并發(fā)限制30個(gè) limit_conn_zone $binary_remote_addr zone=addr:10m; server塊配置 limit_req zone=one burst=5; limit_conn addr 30;
2、ngxhttplimitconnmodule 可以用來(lái)限制單個(gè)IP的連接數(shù):
ngxhttplimitconnmodule模塊可以按照定義的鍵限定每個(gè)鍵值的連接數(shù)??梢栽O(shè)定單一 IP 來(lái)源的連接數(shù)。
并不是所有的連接都會(huì)被模塊計(jì)數(shù);只有那些正在被處理的請(qǐng)求(這些請(qǐng)求的頭信息已被完全讀入)所在的連接才會(huì)被計(jì)數(shù)。
http { limit_conn_zone $binary_remote_addr zone=addr:10m; ... server { ... location /download/ { limit_conn addr 1; }