日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

一、背景

之前在做電商相關業務的時候,有一個常見的需求場景是:用戶下單之后,超過半小時不支付,就取消訂單?,F在我們在淘寶京東買東西,或者通過美團點外賣,下單之后,如果不在指定時間內支付,訂單也會取消。 那么,如何實現這樣的超時取消邏輯呢,通過消息隊列的延時消息,是一個非常穩定的實現方案。

RocketMQ 就提供了這樣的延時消息功能,producer 端在發送消息時,設置延遲級別,從秒級到分鐘小時等等。消息在發送之后,會在消息隊列的服務器進行存儲。等過了設定的延遲時間之后,消息才會被consumer端消費到。

 

如果我們在下單的時候,發送一條設置延時30分鐘的消息,這條消息會在30分鐘之后被下游系統消費,然后判斷訂單有沒有支付,如果沒有支付,則取消訂單。那么這樣,通過消息隊列就完成了一個延遲取消的邏輯了。

二、原理

設置延時

先來看一下如何設置消息的延時 消息體可以通過setDelayTimeLevel方法來設置延時級別

public void produce() {
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

其實是將延遲信息存到 Message 的 property 中(property是一個保存meta信息的hashmap)

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null == this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}

之后,broker收到 message之后,會根據 message 中設置的延時級別進行處理 可以看看延時級別的具體情況: 一共分為18個級別(1-18),對應時間從1s到2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}

那么整個系統是怎么做到讓consumer在設定的延時之后,開始消費指定消息的呢?

不得不說,RocketMQ 的設計還是挺巧妙的,我們接著往下看。

消息預存

對于broker收到的延時消息,并不是和普通消息一樣,進入發送端指定的topic中, 而是進入專門的延時topic中,延時topic有18條隊列(queueId 編號0-17),queueId 和 delayLevel 的關系是 queueId + 1 = delayLevel,是一一對應的。所以計算延時消息的待執行時間deliverTimestamp之后,會將消息存入對應延時級別的隊列中。

// 如果是延遲消息
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // 重設延遲消息的topic和queueId,topic為指定的RMQ_SYS_SCHEDULE_TOPIC
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    ...
    // 將實際的指定topic和queueId進行存入property,進行備份
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

之后,會由ScheduleMessageService來進行任務處理。ScheduleMessageService是broker啟動時就開始執行的,用來處理延遲隊列中的消息,處理的邏輯如下所示。

public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 創建一個Timer,用于執行定時任務
        this.timer = new Timer("ScheduleMessageTimerThread", true);
            
        // 這里對每個delayLevel的queue都創建一個DeliverDelayedMessageTimerTask,
        // 用來處理對應queue中的消息
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

ScheduleMessageService啟動之后,會根據延時隊列的數目創建一一對應的
DeliverDelayedMessageTimerTask,然后周期執行。該類繼承自TimerTask,是JDK的工具類,用于執行定時任務,原理可以參考這篇文章 如何實現定時任務- JAVA Timer/TimerTask 源碼原理解析

消息轉投

可以看到
DeliverDelayedMessageTimerTask實現的 run 方法,主要邏輯都在executeOnTimeup方法中,從對應的延遲隊列中取出時間已到的 message,發送到 message 對應原始topic的隊列中。只要隊列沒有發生消費積壓,message 就會馬上被消費了。(這部分的代碼實現比較復雜,感興趣可以去看對應的源碼)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            if (isStarted()) {
                this.executeOnTimeup();
            }
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    public void executeOnTimeup() {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if (cq != null) {
            
            // 這部分是核心邏輯,實現的是 從延時消息隊列中取出 deliverTimestamp - now <= 0 的消息,
            // 將消息從延時queue移到原本指定Topic的queue中,這些消息就馬上會被consumer消費。
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
}

總體的原理示意圖,如下所示:

 

broker 在接收到延時消息的時候,會將延時消息存入到延時TOPIC的隊列中,然后ScheduleMessageService中,每個 queue 對應的定時任務會不停地被執行,檢查 queue 中哪些消息已到設定時間,然后轉發到消息的原始TOPIC,這些消息就會被各自的 producer 消費了。

三、拓展-消費重試

平常在使用RocketMQ的時候,一般會依賴consumer的消費重試功能。 而consumer端的消費重試,其實也是通過這個和延時隊列差不多的原理來實現的。

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 這里如果返回RECONSUME_LATER,就會重試消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

RocketMQ規定,以下三種情況統一按照消費失敗處理并會發起重試。

  • 業務消費方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 業務消費方返回null
  • 業務消費方主動/被動拋出異常

業務代碼中,一般會利用重試功能去做下游邏輯的重試。而RocketMQ的重試并不是固定時間間隔重復進行,二是采取的退避式重試,重試的時間間隔會不斷變長。 這個時間間隔,和設置delayLevel的延時類似。

Consumer客戶端會通過processConsumeResult方法處理每一條消息的消費結果,如果判斷需要進行重試,則會通過sendMessageBack方法將消息發送到broker,重試消息會帶上已重試次數的信息。

broker收到消息之后,SendMessageProcessor會對重試消息進行處理,設置topic為RETRY_TOPIC,具體邏輯如下:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements.NETtyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // 給重試消息設置新的topic
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 根據已經發生重試的次數確定delayLevel
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重試次數+1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 存儲消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }   
}

存儲消息的時候,CommitLog.putMessage方法內部判斷如果設置了delayLevel,就會重設topic為SCHEDULE_TOPIC,然后將消息存儲到延時隊列中,后續就是和ScheduleMessageService的邏輯相同。

整個消息重試的邏輯示意圖如下所示:

 

如圖所示

  1. Consumer在消費的時候,都會訂閱指定的TOPIC-NORMAL_TOPIC和該ConsumerGroup對應的重試TOPIC-RETRY_GROUP1_TOPIC,同時消費來自這兩個topic中的消息。
  2. 當發生消費失敗后,Client端會調用sendMessageBack方法將失敗消息發送回broker。
  3. broker端的SendMessageProcessor會根據當前的重試次數確定延時級別,將消息存入延時隊列-SCHEDULE_TOPIC中。
  4. ScheduleMessageService會將到期的消息重新發送到重試TOPIC-RETRY_GROUP1_TOPIC中,這個時候消息被Consumer消費,就完成了整個重試過程。

可以對比之前的延時消息流程去看,其實重試消息就是將失敗的消息當作延時消息進行處理,只不過最后投入的是專門的重試消息隊列中。

四、總結

延時消息都是非常日常業務使用中很重要的功能,而RocketMQ通過時間片分級+多隊列+定時任務,就實現了這樣的功能,設計上是很巧妙的。并且消費重試采用退避式的策略,重試時間的梯度剛好與延時消息策略一致,這樣就可以直接利用延時隊列去完成消息重試的功能,從策略上來說非常合理(消息消費重復失敗,在短時間內重試成功的可能性比較低),并且復用了底層代碼,這些是值得去學習和借鑒的。

分享到:
標簽:RocketMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定