目錄
- 一、前情提示
- 二、unack消息的積壓問題
- 三、如何解決unack消息的積壓問題
- 四、高并發場景下的內存溢出問題
- 五、低吞吐量問題
- 六、合理設置prefetch count
- 七、階段性總結
一、前情提示
上一篇文章:《RocketMQ消息中間件用起來真的可靠嗎?》,我們分析了ack機制的底層實現原理(delivery tag機制),還有消除處理失敗時的nack機制如何觸發消息重發。
通過這個,已經讓大家進一步對消費端保證數據不丟失的方案的理解更進一層了。
這篇文章,我們將會對ack底層的delivery tag機制進行更加深入的分析,讓大家理解的更加透徹一些。
面試時,如果被問到消息中間件數據不丟失問題的時候,可以更深入到底層,給面試官進行分析。
二、unack消息的積壓問題
首先,我們要給大家介紹一下RabbitMQ的prefetch count這個概念。
大家看過上篇文章之后應該都知道了,對每個channel(其實對應了一個消費者服務實例,你大體可以這么來認為),RabbitMQ投遞消息的時候,都是會帶上本次消息投遞的一個delivery tag的,唯一標識一次消息投遞。
然后,我們進行ack時,也會帶上這個delivery tag,基于同一個channel進行ack,ack消息里會帶上delivery tag讓RabbitMQ知道是對哪一次消息投遞進行了ack,此時就可以對那條消息進行刪除了。
大家先來看一張圖,幫助大家回憶一下這個delivery tag的概念。
所以大家可以考慮一下,對于每個channel而言(你就認為是針對每個消費者服務實例吧,比如一個倉儲服務實例),其實都有一些處于unack狀態的消息。
比如RabbitMQ正在投遞一條消息到channel,此時消息肯定是unack狀態吧?
然后倉儲服務接收到一條消息以后,要處理這條消息需要耗費時間,此時消息肯定是unack狀態吧?
同時,即使你執行了ack之后,你要知道這個ack他默認是異步執行的,尤其如果你開啟了批量ack的話,更是有一個延遲時間才會ack的,此時消息也是unack吧?
那么大家考慮一下,RabbitMQ他能夠無限制的不停給你的消費者服務實例推送消息嗎?
明顯是不能的,如果RabbitMQ給你的消費者服務實例推送的消息過多過快,比如都有幾千條消息積壓在某個消費者服務實例的內存中。
那么此時這幾千條消息都是unack的狀態,一直積壓著,是不是有可能會導致消費者服務實例的內存溢出?內存消耗過大?甚至內存泄露之類的問題產生?
所以說,RabbitMQ是必須要考慮一下消費者服務的處理能力的。
大家看看下面的圖,感受一下如果消費者服務實例的內存中積壓消息過多,都是unack的狀態,此時會怎么樣。
三、如何解決unack消息的積壓問題
正是因為這個原因,RabbitMQ基于一個prefetch count來控制這個unack message的數量。
你可以通過 “channel.basicQos(10)” 這個方法來設置當前channel的prefetch count。
舉個例子,比如你要是設置為10的話,那么意味著當前這個channel里,unack message的數量不能超過10個,以此來避免消費者服務實例積壓unack message過多。
這樣的話,就意味著RabbitMQ正在投遞到channel過程中的unack message,以及消費者服務在處理中的unack message,以及異步ack之后還沒完成ack的unack message,所有這些message加起來,一個channel也不能超過10個。
如果你要簡單粗淺的理解的話,也大致可以理解為這個prefetch count就代表了一個消費者服務同時最多可以獲取多少個message來處理。所以這里也點出了prefetch這個單詞的意思。
prefetch就是預抓取的意思,就意味著你的消費者服務實例預抓取多少條message過來處理,但是最多只能同時處理這么多消息。
如果一個channel里的unack message超過了prefetch count指定的數量,此時RabbitMQ就會停止給這個channel投遞消息了,必須要等待已經投遞過去的消息被ack了,此時才能繼續投遞下一個消息。
老規矩,給大家上一張圖,我們一起來看看這個東西是啥意思。
四、高并發場景下的內存溢出問題
好!現在大家對ack機制底層的另外一個核心機制:prefetch機制也有了一個深刻的理解了。
此時,咱們就應該來考慮一個問題了。就是如何來設置這個prefetch count呢?這個東西設置的過大或者過小有什么影響呢?
其實大家理解了上面的圖就很好理解這個問題了。
假如說我們把prefetch count設置的很大,比如說3000,5000,甚至100000,就這樣特別大的值,那么此時會如何呢?
這個時候,在高并發大流量的場景下,可能就會導致消費者服務的內存被快速的消耗掉。
因為假如說現在MQ接收到的流量特別的大,每秒都上千條消息,而且此時你的消費者服務的prefetch count還設置的特別大,就會導致可能一瞬間你的消費者服務接收到了達到prefetch count指定數量的消息。
打個比方,比如一下子你的消費者服務內存里積壓了10萬條消息,都是unack的狀態,反正你的prefetch count設置的是10萬。
那么對一個channel,RabbitMQ就會最多容忍10萬個unack狀態的消息,在高并發下也就最多可能積壓10萬條消息在消費者服務的內存里。
那么此時導致的結果,就是消費者服務直接被擊垮了,內存溢出,OOM,服務宕機,然后大量unack的消息會被重新投遞給其他的消費者服務,此時其他消費者服務一樣的情況,直接宕機,最后造成雪崩效應。
所有的消費者服務因為扛不住這么大的數據量,全部宕機。
大家來看看下面的圖,自己感受一下現場的氛圍。
五、低吞吐量問題
那么如果反過來呢,我們要是把prefetch count設置的很小會如何呢?
比如說我們把prefetch count設置為1?此時就必然會導致消費者服務的吞吐量極低。因為你即使處理完一條消息,執行ack了也是異步的。
給你舉個例子,假如說你的prefetch count = 1,RabbitMQ最多投遞給你1條消息處于unack狀態。
此時比如你剛處理完這條消息,然后執行了ack的那行代碼,結果不幸的是,ack需要異步執行,也就是需要100ms之后才會讓RabbitMQ感知到。
那么100ms之后RabbitMQ感知到消息被ack了,此時才會投遞給你下一條消息!
這就尷尬了,在這100ms期間,你的消費者服務是不是啥都沒干啊?
這不就直接導致了你的消費者服務處理消息的吞吐量可能下降10倍,甚至百倍,千倍,都有這種可能!
大家看看下面的圖,感受一下低吞吐量的現場。
六、合理的設置prefetch count
所以鑒于上面兩種極端情況,RabbitMQ官方給出的建議是prefetch count一般設置在100~300之間。
也就是一個消費者服務最多接收到100~300個message來處理,允許處于unack狀態。
這個狀態下可以兼顧吞吐量也很高,同時也不容易造成內存溢出的問題。
但是其實在我們的實踐中,這個prefetch count大家完全是可以自己去壓測一下的。
比如說慢慢調節這個值,不斷加大,觀察高并發大流量之下,吞吐量是否越來越大,而且觀察消費者服務的內存消耗,會不會OOM、頻繁FullGC等問題。
七、階段性總結
其實通過最近幾篇文章,基本上已經把消息中間件的消費端如何保證數據不丟失這個問題剖析的較為深入和透徹了。
如果你是基于RabbitMQ來做消息中間件的話,消費端的代碼里,必須考慮三個問題:手動ack、處理失敗的nack、prefetch count的合理設置
這三個問題背后涉及到了各種機制:
- 自動ack機制
- delivery tag機制
- ack批量與異步提交機制
- 消息重發機制
- 手動nack觸發消息重發機制
- prefetch count過大導致內存溢出問題
- prefetch count過小導致吞吐量過低
這些底層機制和問題,咱們都一步步分析清楚了。
所以到現在,單論消費端這塊的數據不丟失技術方案,相信大家在面試的時候就可以有一整套自己的理解和方案可以闡述了。