延遲消息一般用于:提前發送消息,延遲一段時間后才需要被處理的場景。比如:下單半小時后還未支付,則取消訂單 釋放庫存 等。
RocketMQ的延遲消息使用上非常便捷,但是不支持任意時間的延遲,這一點對于有強迫癥的朋友來說就比較難受,但是搞明白為什么這么設計后,就自然釋懷了。
為什么RocketMQ不支持任意時間的延時?為什么延遲時間只能是從1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h這些時間段里選?如果讓你來設計RocketMQ的延遲消息,你會怎么設計?本文從以上幾個問題聊聊RocketMQ的延遲消息。
一、使用延遲消息
RocketMQ不支持任意時間的延遲,只有18個等級的延遲時間,默認是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。從頭到尾共18個等級時間,s、m、h、d,分別表示秒、分、時、天。
默認的18個等級對應的時間可以修改,在broker.conf中增加如下配置,根據自身需求修改時間,然后重啟broker。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RocketMQ發送延遲消息只需要給消息設置延遲時間級別setDelayTimeLevel。
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
Message rocketMsg = new Message(topic, tags, payloads);
// delayLevel=0時,無需延遲
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult sendResult = producer.send(rocketMsg, timeout);
二、延遲消息的原理
1.你會怎么設計
如果讓你來設計RocketMQ的延遲消息,你會怎么設計?筆者會這樣設計:
- 延遲消息也是個消息,只是多了延遲時間,既然是消息,不管是不是要立刻處理,先找個臨時Topic存儲起來。
- Topic里面實際上是一個個隊列,那所有的延遲消息要存在一個隊列里嗎?不要放在同一個隊列里,因為消息各自都有不同的延遲時間,如果放在一個隊列里,會牽扯到其余問題:比如排序、比如記錄消費位置等。所以是按延遲時間分開存。
- 消息已經存起來了,那怎么處理呢?既然涉及到了延遲時間,那自然啟動線程去定時獲取消息,判斷消息的延遲時間是否已經到達,到達之后則取出來投放到目的Topic。
2.粗略架構圖
講到這里,延遲消息的架構圖基本浮現出來了:
3.RocketMQ的設計
實際上RocketMQ在設計延遲消息時,跟上面的思路基本類似,不在贅述,額外補充幾點:
- 消息進入Broker后,會被存儲在TopicSCHEDULE_TOPIC_XXXX中,只是在Dashboard中看不到。
- TopicSCHEDULE_TOPIC_XXXX中有18個消息隊列,分別存儲18個延遲等級對應的消息。
- RocketMQ 在啟動時,會從broker.conf中獲取18個等級對應的時間,延遲等級和時間的關系會存在放到DelayLevelTable中。
- RocketMQ會開啟18個定時任務每隔100ms,從TopicSCHEDULE_TOPIC_XXXX判斷18個隊列里的第一個消息是否可以被投放,如果可以投放,則在投放到原本的目的Topic。判斷邏輯:存入時間+延遲時間 > 當前時間。
三、為什么不支持自定義延時時間
說到這里,估計你也能猜到,為什么不支持自定義延遲時間了,核心原因還是性能問題。
試想一下,如果設計成任意時間,那么就不可能使用18個隊列了,更不可能使用無限個隊列了,只可能使用單個隊列。
但是如果使用單個隊列,按照先進先出的存放的話,那出現需要后進先出的消息怎么辦?那只能對整個隊列進行排序,如果消息量很大,每次有消息進來都需要排序,那CPU肯定會被玩爆。
而且隊列里的消息被消費后,都會記錄偏移量,如果每次有消息進來都要排序,那偏移量則失去意義,增加了消息丟失的風險。
所以,RocketMQ的這種18個延遲時間等級的設計,雖然在延遲時間的自由度上作出了妥協,但是基本滿足業務,性能也很優秀。
四、總結
本文聊了RocketMQ延遲消息的使用、原理、解答了部分疑問。核心概念:臨時Topic、目的Topic、定時任務、18個延遲等級、18個消息隊列。RocketMQ延遲消息的設計方式,是一種兼顧了性能和業務的優秀設計。