一、背景
之前在做電商相關業務的時候,有一個常見的需求場景是:用戶下單之后,超過半小時不支付,就取消訂單?,F在我們在淘寶京東買東西,或者通過美團點外賣,下單之后,如果不在指定時間內支付,訂單也會取消。 那么,如何實現這樣的超時取消邏輯呢,通過消息隊列的延時消息,是一個非常穩定的實現方案。
RocketMQ 就提供了這樣的延時消息功能,producer 端在發送消息時,設置延遲級別,從秒級到分鐘小時等等。消息在發送之后,會在消息隊列的服務器進行存儲。等過了設定的延遲時間之后,消息才會被consumer端消費到。
如果我們在下單的時候,發送一條設置延時30分鐘的消息,這條消息會在30分鐘之后被下游系統消費,然后判斷訂單有沒有支付,如果沒有支付,則取消訂單。那么這樣,通過消息隊列就完成了一個延遲取消的邏輯了。
二、原理
設置延時
先來看一下如何設置消息的延時 消息體可以通過setDelayTimeLevel方法來設置延時級別
public void produce() {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(1)
SendResult sendResult = producer.send(msg);
}
public void consume() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
其實是將延遲信息存到 Message 的 property 中(property是一個保存meta信息的hashmap)
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
void putProperty(final String name, final String value) {
if (null == this.properties) {
this.properties = new HashMap<String, String>();
}
this.properties.put(name, value);
}
之后,broker收到 message之后,會根據 message 中設置的延時級別進行處理 可以看看延時級別的具體情況: 一共分為18個級別(1-18),對應時間從1s到2h
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
那么整個系統是怎么做到讓consumer在設定的延時之后,開始消費指定消息的呢?
不得不說,RocketMQ 的設計還是挺巧妙的,我們接著往下看。
消息預存
對于broker收到的延時消息,并不是和普通消息一樣,進入發送端指定的topic中, 而是進入專門的延時topic中,延時topic有18條隊列(queueId 編號0-17),queueId 和 delayLevel 的關系是 queueId + 1 = delayLevel,是一一對應的。所以計算延時消息的待執行時間deliverTimestamp之后,會將消息存入對應延時級別的隊列中。
// 如果是延遲消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 重設延遲消息的topic和queueId,topic為指定的RMQ_SYS_SCHEDULE_TOPIC
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
...
// 將實際的指定topic和queueId進行存入property,進行備份
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
之后,會由ScheduleMessageService來進行任務處理。ScheduleMessageService是broker啟動時就開始執行的,用來處理延遲隊列中的消息,處理的邏輯如下所示。
public class ScheduleMessageService extends ConfigManager {
// key: delayLevel | value: delayTimeMillis
private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
public void start() {
// 創建一個Timer,用于執行定時任務
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 這里對每個delayLevel的queue都創建一個DeliverDelayedMessageTimerTask,
// 用來處理對應queue中的消息
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
}
ScheduleMessageService啟動之后,會根據延時隊列的數目創建一一對應的
DeliverDelayedMessageTimerTask,然后周期執行。該類繼承自TimerTask,是JDK的工具類,用于執行定時任務,原理可以參考這篇文章 如何實現定時任務- JAVA Timer/TimerTask 源碼原理解析
消息轉投
可以看到
DeliverDelayedMessageTimerTask實現的 run 方法,主要邏輯都在executeOnTimeup方法中,從對應的延遲隊列中取出時間已到的 message,發送到 message 對應原始topic的隊列中。只要隊列沒有發生消費積壓,message 就會馬上被消費了。(這部分的代碼實現比較復雜,感興趣可以去看對應的源碼)
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
// 這部分是核心邏輯,實現的是 從延時消息隊列中取出 deliverTimestamp - now <= 0 的消息,
// 將消息從延時queue移到原本指定Topic的queue中,這些消息就馬上會被consumer消費。
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
}
總體的原理示意圖,如下所示:
broker 在接收到延時消息的時候,會將延時消息存入到延時TOPIC的隊列中,然后ScheduleMessageService中,每個 queue 對應的定時任務會不停地被執行,檢查 queue 中哪些消息已到設定時間,然后轉發到消息的原始TOPIC,這些消息就會被各自的 producer 消費了。
三、拓展-消費重試
平常在使用RocketMQ的時候,一般會依賴consumer的消費重試功能。 而consumer端的消費重試,其實也是通過這個和延時隊列差不多的原理來實現的。
public void consume() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 這里如果返回RECONSUME_LATER,就會重試消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
RocketMQ規定,以下三種情況統一按照消費失敗處理并會發起重試。
- 業務消費方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- 業務消費方返回null
- 業務消費方主動/被動拋出異常
業務代碼中,一般會利用重試功能去做下游邏輯的重試。而RocketMQ的重試并不是固定時間間隔重復進行,二是采取的退避式重試,重試的時間間隔會不斷變長。 這個時間間隔,和設置delayLevel的延時類似。
Consumer客戶端會通過processConsumeResult方法處理每一條消息的消費結果,如果判斷需要進行重試,則會通過sendMessageBack方法將消息發送到broker,重試消息會帶上已重試次數的信息。
broker收到消息之后,SendMessageProcessor會對重試消息進行處理,設置topic為RETRY_TOPIC,具體邏輯如下:
public class SendMessageProcessor
extends AbstractSendMessageProcessor
implements.NETtyRequestProcessor {
private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// 給重試消息設置新的topic
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 根據已經發生重試的次數確定delayLevel
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
// 重試次數+1
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
// 存儲消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
// ...
}
}
存儲消息的時候,CommitLog.putMessage方法內部判斷如果設置了delayLevel,就會重設topic為SCHEDULE_TOPIC,然后將消息存儲到延時隊列中,后續就是和ScheduleMessageService的邏輯相同。
整個消息重試的邏輯示意圖如下所示:
如圖所示
- Consumer在消費的時候,都會訂閱指定的TOPIC-NORMAL_TOPIC和該ConsumerGroup對應的重試TOPIC-RETRY_GROUP1_TOPIC,同時消費來自這兩個topic中的消息。
- 當發生消費失敗后,Client端會調用sendMessageBack方法將失敗消息發送回broker。
- broker端的SendMessageProcessor會根據當前的重試次數確定延時級別,將消息存入延時隊列-SCHEDULE_TOPIC中。
- ScheduleMessageService會將到期的消息重新發送到重試TOPIC-RETRY_GROUP1_TOPIC中,這個時候消息被Consumer消費,就完成了整個重試過程。
可以對比之前的延時消息流程去看,其實重試消息就是將失敗的消息當作延時消息進行處理,只不過最后投入的是專門的重試消息隊列中。
四、總結
延時消息都是非常日常業務使用中很重要的功能,而RocketMQ通過時間片分級+多隊列+定時任務,就實現了這樣的功能,設計上是很巧妙的。并且消費重試采用退避式的策略,重試時間的梯度剛好與延時消息策略一致,這樣就可以直接利用延時隊列去完成消息重試的功能,從策略上來說非常合理(消息消費重復失敗,在短時間內重試成功的可能性比較低),并且復用了底層代碼,這些是值得去學習和借鑒的。