之前在實現熔斷降級組件時,需要實現一個接口的超時中斷,意思是,業務在使用熔斷降級功能時,在平臺上設置了一個超時時間,如果在請求進入熔斷器開始計時,并且接口在超時時間內沒有響應,則需要提早中斷該請求并返回。
比如正常下游接口的超時時間為800ms,但是因為自身業務的特殊需求,最多只能等200ms,如果200ms之內沒有數據返回,則返回降級數據。這里處理請求的線程可以看成是Tomcat線程池中的一個線程,如果通過線程池返回的Future,可以很輕松的實現超時返回,但是這種情況下,并不能拿到Futrue,需要換一種思路。

思路
中斷一個線程的思路有哪些?
除了已經廢棄的Thread.stop, Thread.suspend, Thread.resume 方法,剩下的貌似只有一種方案了,就是調用當前線程的interrupt(),但是這個方法的作用并不是中斷線程,而是設置一個標識,通知該線程可以被中斷了,到底是繼續執行,還是中斷返回,由線程本身自己決定。
具體來說,當對一個線程調用了interrupt()之后,如果該線程處于被阻塞狀態(比如執行了wait、sleep或join等方法),那么會立即退出阻塞狀態,并拋出一個InterruptedException異常,在代碼中catch這個異常進行后續處理。如果線程一直處于運行狀態,那么只會把該線程的中斷標志設置為 true,僅此而已,所以interrupt()并不能真正的中斷線程,不過在rpc調用的場景中,請求線程一般都處于阻塞狀態,等待數據返回,這時interrupt()方法是可以派上用場的。
那么,要實現指定超時時間內中斷請求線程,還有最后一個問題需要解決:什么時候,由誰去執行interrupt()方法?
必然這個方法只能由其它線程來執行了(自己都阻塞了,執行個鬼),而且是在請求進入熔斷器時,并在超時時間之后執行,有點繞,比如超時時間是200ms,那么請求進入熔斷器之后,再過200ms,就執行interrupt(),但是在200ms之內有數據返回,那么就不執行interrupt()了。
實現
需求已經很明確了,相當于延遲執行一個task,其內部邏輯就是執行請求線程的interrupt(),當然還有其它的邏輯。
Runnable task = new Runnable() {
@Override
public void run() {
try {
thread.interrupt();
// 取消定時器任務
f.cancel();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
Doug Lea大神提供的ScheduledThreadPoolExecutor可以很好的滿足這個需求,通過scheduleAtFixedRate方法可以很方便的實現在延遲指定時間之后執行提交的任務。
ScheduledFuture<?> f = executor.scheduleAtFixedRate( task, timeout, timeout, TimeUnit.MILLISECONDS);
在請求進入熔斷器時,順便提交一個任務到線程池中等待執行,如果接口在超時時間內沒有返回,那么該任務會被觸發,并執行請求線程的interrupt方法,這樣就實現了請求線程的中斷(因為這時請求線程正在被阻塞,等待數據返回),另外需要清空定時任務,不然這個任務會一直執行。
如果接口正常返回了,也要記得清空定時任務,并且在請求退出熔斷器的時候,記得恢復請求線程的中斷標識,如何恢復?在請求線程中執行下面代碼即可。
Thread.interrupted(); // 內部邏輯 public static boolean interrupted() { return currentThread().isInterrupted(true); } // 參數為true,可以清除中斷標識 private native boolean isInterrupted(boolean ClearInterrupted);
執行當前線程(即請求線程)的isInterrupted方法。
使用這種方式實現請求的超時中斷,在QPS很高的情況下,會有額外的性能損失,因為每次請求都要提交一個任務到線程池中等待執行。