消息隊列中的消息消費時并不能保證總是成功的,那失敗的消息該怎么進行消息補償呢?這就用到今天的主角消息重試和死信隊列了。
生產者消息重試
有時因為網路等原因生產者也可能發送消息失敗,也會進行消息重試,生產者消息重試比較簡單,在springboot中只要在配置文件中配置一下就可以了。
# 異步消息發送失敗重試次數,默認為2
rocketmq.producer.retry-times-when-send-async-failed=2
# 消息發送失敗重試次數,默認為2
rocketmq.producer.retry-times-when-send-failed=2
也可以通過下面這種方式配置
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
消費者消息重試
Apache RocketMQ 有兩種消費模式:集群消費模式和廣播消費模式。消息重試只針對集群消費模式生效;廣播消費模式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。
同時RocketMq Push消費提供了兩種消費方式:并發消費和順序消費。
并發消費
在并發消費中,可能會有多個線程同時消費一個隊列的消息,因此即使發送端通過發送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費,所有并發消費也可以稱之為無序消費。
順序消費
順序消費是消息生產者發送過來的消息會遵循FIFO隊列的思想,先進先出有順序的消費消息。 對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,在使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。
并發消費和順序消費區別
順序消費和并發消費的重試機制并不相同,順序消費消費失敗后會先在客戶端本地重試直到最大重試次數,這樣可以避免消費失敗的消息被跳過,消費下一條消息而打亂順序消費的順序,而并發消費消費失敗后會將消費失敗的消息重新投遞回服務端,再等待服務端重新投遞回來,在這期間會正常消費隊列后面的消息。
并發消費失敗后并不是投遞回原Topic,而是投遞到一個特殊Topic,其命名為%RETRY%ConsumerGroupName,集群模式下并發消費每一個ConsumerGroup會對應一個特殊Topic,并會訂閱該Topic。
兩者參數差別如下
消費類型 |
重試間隔 |
最大重試次數 |
順序消費 |
間隔時間可通過自定義設置, |
最大重試次數可通過自定義參數MaxReconsumeTimes取值進行配置。該參數取值無最大限制。若未設置參數值,默認最大重試次數為Integer.MAX |
并發消費 |
間隔時間根據重試次數階梯變化,取值范圍:1秒~2小時。不支持自定義配置 |
最大重試次數可通過自定義參數MaxReconsumeTimes取值進行配置。默認值為16次,該參數取值無最大限制,建議使用默認值 |
并發消費重試間隔如下:
第幾次重試 |
與上次重試的間隔時間 |
第幾次重試 |
與上次重試的間隔時間 |
1 |
10s |
9 |
7min |
2 |
30s |
10 |
8min |
3 |
1min |
11 |
9min |
4 |
2min |
12 |
10min |
5 |
3min |
13 |
20min |
6 |
4min |
14 |
30min |
7 |
5min |
15 |
1h |
8 |
6min |
16 |
2h |
死信隊列
當一條消息初次消費失敗,RocketMQ會自動進行消息重試,達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息。此時,該消息不會立刻被丟棄,而是將其發送到該消費者對應的特殊隊列中,這類消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue),死信隊列是死信Topic下分區數唯一的單獨隊列。如果產生了死信消息,那對應的ConsumerGroup的死信Topic名稱為%DLQ%ConsumerGroupName,死信隊列的消息將不會再被消費。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查詢到對應死信消息的信息。
實踐出真知
Talk is cheap,show you the code.
公共部分創建
1.配置文件
rocketmq.name-server=localhost:9876
# 消費者組
rocketmq.producer.group=producer_group
rocketmq.consumer.topic=consumer_topic
rocketmq.consumer.group=consumer_group
2.創建消費者RetryConsumerDemo
@Component
public class RetryConsumerDemo {
@Value("${rocketmq.name-server}")
private String namesrvAddr;
@Value("${rocketmq.consumer.topic}")
private String topic;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//設置集群消費模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//設置消費超時時間(分鐘)
consumer.setConsumeTimeout(1);
//訂閱主題
consumer.subscribe(topic , "*");
//注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
//最大重試次數
consumer.setMaxReconsumeTimes(2);
//啟動消費端
consumer.start();
System.out.println("Retry Consumer Start...");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
測試并發消費
1.創建并發消費監聽類 并發消費監聽類要實現
MessageListenerConcurrently類
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt message = msgs.get(0);
try {
final LocalDateTime now = LocalDateTime.now();
//逐條消費
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",messageBody: " + messageBody);
//模擬消費失敗
if ("Concurrently_test".equals(messageBody)) {
int a = 1 / 0;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
2.注冊監聽類 在消費者類RetryConsumerDemo中注冊監聽類
//注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
3.測試
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {
@Value("${rocketmq.consumer.topic}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testProducer(){
String msg = "Concurrently_test";
rocketMQTemplate.convertAndSend(topic , msg);
}
}
測試結果:
后面重試時間太長就不做測試了,可以看到并發消費的消息時間都是按照上面那張時間間隔表來。
然后通過RocketMq Dashboard Topic一欄可以看到有一個重試消費者組%RETRY%consumer_group,這個消費者組內存放的就是consumer_group消費者組消費失敗重試的消息。
并發消費的重試次數是可以修改的,重試次數對應參數DefaultMQPushConsumer類的maxReconsumeTimes屬性,maxReconsumeTimes默認是-1,也就是默認會重試16次;0代表不重試,只要失敗就會放入死信隊列;1-16重試次數對應著上面時間間隔表中對應次數。配置的最大重試次數超過16就按16處理。
并發消費狀態
并發消費有兩個狀態CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表著消費成功,返回RECONSUME_LATER代表進行消息重試。
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}
當
MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法拋異常了,都會進行消息重試。當然還是推薦返回ConsumeConcurrentlyStatus#RECONSUME_LATER。
測試順序消費
順序消費和并行消費其實都差不多的,只不過順序消費實現的是MessageListenerOrderly 接口
1.創建順序消費監聽類
public class MessageListenerOrderlyImpl implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeOrderlyStatus.SUCCESS;
}
MessageExt message = msgs.get(0);
try {
final LocalDateTime now = LocalDateTime.now();
//逐條消費
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",messageBody: " + messageBody);
//模擬消費失敗
if ("Orderly_test".equals(messageBody)) {
int a = 1 / 0;
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
2.注冊監聽類
//最大重試次數
consumer.setMaxReconsumeTimes(2);
//順序消費 重試時間間隔
consumer.setSuspendCurrentQueueTimeMillis(2000);
SuspendCurrentQueueTimeMillis表示重試的時間間隔,默認是1s,這里修改成2s
3.測試
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {
@Value("${rocketmq.consumer.topic}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testProducer(){
String msg = "Orderly_test";
rocketMQTemplate.convertAndSend(topic , msg);
}
}
測試結果:
可以看到三條結果,第一條是第一次消費的,其余兩條是隔了2s重試的。重試2次之后這條數據就進入了死信隊列。
順序消費狀態
順序消費目前也是兩個狀態:SUCCESS和
SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下,過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊列里。
public enum ConsumeOrderlyStatus {
/**
* Success consumption
*/
SUCCESS,
/**
* Rollback consumption(only for binlog consumption)
*/
@Deprecated
ROLLBACK,
/**
* Commit offset(only for binlog consumption)
*/
@Deprecated
COMMIT,
/**
* Suspend current queue a moment
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
測試死信隊列
并發消費和順序消費達到了最大重試次數之后就會放到死信隊列。死信隊列在一開始是不會被創建的,只有需要的時候才會被創建。就拿上面測試結果來看,進入到的死信隊列就是%DLQ%consumer_group,進入死信隊列的消息要收到處理。
死信隊列特性
- 不會再被消費者正常消費。
- 一個死信隊列對應一個分組, 而不是對應單個消費者實例。
- 如果一個消費者組未產生死信消息,消息隊列 RocketMQ 不會為其創建相應的死信隊列。
- 一個死信隊列包含了對應 分組產生的所有死信消息,不論該消息屬于哪個 Topic。
- 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產生后的 3 天內及時處理
作者:索碼理
鏈接:
https://juejin.cn/post/7151571345199874084
來源:稀土掘金