日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

消息隊列中的消息消費時并不能保證總是成功的,那失敗的消息該怎么進行消息補償呢?這就用到今天的主角消息重試和死信隊列了。

生產者消息重試

有時因為網路等原因生產者也可能發送消息失敗,也會進行消息重試,生產者消息重試比較簡單,在springboot中只要在配置文件中配置一下就可以了。

# 異步消息發送失敗重試次數,默認為2
rocketmq.producer.retry-times-when-send-async-failed=2
# 消息發送失敗重試次數,默認為2
rocketmq.producer.retry-times-when-send-failed=2

也可以通過下面這種方式配置

DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

消費者消息重試

Apache RocketMQ 有兩種消費模式:集群消費模式和廣播消費模式。消息重試只針對集群消費模式生效;廣播消費模式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。

同時RocketMq Push消費提供了兩種消費方式:并發消費和順序消費。

并發消費

在并發消費中,可能會有多個線程同時消費一個隊列的消息,因此即使發送端通過發送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費,所有并發消費也可以稱之為無序消費。

順序消費

順序消費是消息生產者發送過來的消息會遵循FIFO隊列的思想,先進先出有順序的消費消息。 對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,在使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。

并發消費和順序消費區別

順序消費和并發消費的重試機制并不相同,順序消費消費失敗后會先在客戶端本地重試直到最大重試次數,這樣可以避免消費失敗的消息被跳過,消費下一條消息而打亂順序消費的順序,而并發消費消費失敗后會將消費失敗的消息重新投遞回服務端,再等待服務端重新投遞回來,在這期間會正常消費隊列后面的消息。

并發消費失敗后并不是投遞回原Topic,而是投遞到一個特殊Topic,其命名為%RETRY%ConsumerGroupName,集群模式下并發消費每一個ConsumerGroup會對應一個特殊Topic,并會訂閱該Topic。

兩者參數差別如下

消費類型

重試間隔

最大重試次數

順序消費

間隔時間可通過自定義設置,
SuspendCurrentQueueTimeMillis

最大重試次數可通過自定義參數MaxReconsumeTimes取值進行配置。該參數取值無最大限制。若未設置參數值,默認最大重試次數為Integer.MAX

并發消費

間隔時間根據重試次數階梯變化,取值范圍:1秒~2小時。不支持自定義配置

最大重試次數可通過自定義參數MaxReconsumeTimes取值進行配置。默認值為16次,該參數取值無最大限制,建議使用默認值

并發消費重試間隔如下:

第幾次重試

與上次重試的間隔時間

第幾次重試

與上次重試的間隔時間

1

10s

9

7min

2

30s

10

8min

3

1min

11

9min

4

2min

12

10min

5

3min

13

20min

6

4min

14

30min

7

5min

15

1h

8

6min

16

2h

死信隊列

當一條消息初次消費失敗,RocketMQ會自動進行消息重試,達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息。此時,該消息不會立刻被丟棄,而是將其發送到該消費者對應的特殊隊列中,這類消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue),死信隊列是死信Topic下分區數唯一的單獨隊列。如果產生了死信消息,那對應的ConsumerGroup的死信Topic名稱為%DLQ%ConsumerGroupName,死信隊列的消息將不會再被消費。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查詢到對應死信消息的信息。

實踐出真知

Talk is cheap,show you the code.

公共部分創建

1.配置文件

rocketmq.name-server=localhost:9876
# 消費者組
rocketmq.producer.group=producer_group

rocketmq.consumer.topic=consumer_topic
rocketmq.consumer.group=consumer_group

2.創建消費者RetryConsumerDemo

@Component
public class RetryConsumerDemo {

    @Value("${rocketmq.name-server}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.topic}")
    private String topic;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");

    @PostConstruct
    public void start() {
        try {
            consumer.setNamesrvAddr(namesrvAddr);
            //設置集群消費模式
            consumer.setMessageModel(MessageModel.CLUSTERING);

            //設置消費超時時間(分鐘)
            consumer.setConsumeTimeout(1);

            //訂閱主題
            consumer.subscribe(topic , "*");

            //注冊消息監聽器
            consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());

            //最大重試次數
            consumer.setMaxReconsumeTimes(2);

            //啟動消費端
            consumer.start();
            System.out.println("Retry Consumer Start...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

測試并發消費

1.創建并發消費監聽類 并發消費監聽類要實現
MessageListenerConcurrently類

public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐條消費
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);

            //模擬消費失敗
            if ("Concurrently_test".equals(messageBody)) {
                int a = 1 / 0;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

2.注冊監聽類 在消費者類RetryConsumerDemo中注冊監聽類

//注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());

3.測試

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {

    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testProducer(){
        String msg = "Concurrently_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

測試結果:

 

后面重試時間太長就不做測試了,可以看到并發消費的消息時間都是按照上面那張時間間隔表來。

然后通過RocketMq Dashboard Topic一欄可以看到有一個重試消費者組%RETRY%consumer_group,這個消費者組內存放的就是consumer_group消費者組消費失敗重試的消息。

 

并發消費的重試次數是可以修改的,重試次數對應參數DefaultMQPushConsumer類的maxReconsumeTimes屬性,maxReconsumeTimes默認是-1,也就是默認會重試16次;0代表不重試,只要失敗就會放入死信隊列;1-16重試次數對應著上面時間間隔表中對應次數。配置的最大重試次數超過16就按16處理。

并發消費狀態

并發消費有兩個狀態CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表著消費成功,返回RECONSUME_LATER代表進行消息重試。

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}


MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法拋異常了,都會進行消息重試。當然還是推薦返回ConsumeConcurrentlyStatus#RECONSUME_LATER。

測試順序消費

順序消費和并行消費其實都差不多的,只不過順序消費實現的是MessageListenerOrderly 接口

1.創建順序消費監聽類

public class MessageListenerOrderlyImpl implements MessageListenerOrderly {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeOrderlyStatus.SUCCESS;
        }

        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐條消費
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);

            //模擬消費失敗
            if ("Orderly_test".equals(messageBody)) {
                int a = 1 / 0;
            }

            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}

2.注冊監聽類

//最大重試次數
consumer.setMaxReconsumeTimes(2);
//順序消費 重試時間間隔
consumer.setSuspendCurrentQueueTimeMillis(2000);


SuspendCurrentQueueTimeMillis表示重試的時間間隔,默認是1s,這里修改成2s

3.測試

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {

    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testProducer(){
        String msg = "Orderly_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

測試結果:

 

可以看到三條結果,第一條是第一次消費的,其余兩條是隔了2s重試的。重試2次之后這條數據就進入了死信隊列。

順序消費狀態

順序消費目前也是兩個狀態:SUCCESS和
SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下,過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊列里。

public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Rollback consumption(only for binlog consumption)
     */
    @Deprecated
    ROLLBACK,
    /**
     * Commit offset(only for binlog consumption)
     */
    @Deprecated
    COMMIT,
    /**
     * Suspend current queue a moment
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

測試死信隊列

并發消費和順序消費達到了最大重試次數之后就會放到死信隊列。死信隊列在一開始是不會被創建的,只有需要的時候才會被創建。就拿上面測試結果來看,進入到的死信隊列就是%DLQ%consumer_group,進入死信隊列的消息要收到處理。

 

死信隊列特性

  • 不會再被消費者正常消費。
  • 一個死信隊列對應一個分組, 而不是對應單個消費者實例。
  • 如果一個消費者組未產生死信消息,消息隊列 RocketMQ 不會為其創建相應的死信隊列。
  • 一個死信隊列包含了對應 分組產生的所有死信消息,不論該消息屬于哪個 Topic。
  • 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產生后的 3 天內及時處理

作者:索碼理
鏈接:
https://juejin.cn/post/7151571345199874084
來源:稀土掘金

分享到:
標簽:RocketMq
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定