日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

作者:君哥聊技術

來源:
https://mp.weixin.qq.com/s/n9QlZ73SQyCGIyPLvHMy0A

大家好,我是君哥。今天聊一聊 RocketMQ 的順序消息實現機制。

在有些場景下,使用 MQ 需要保證消息的順序性,比如在電商系統中,用戶提交訂單、支付訂單、訂單出庫這 3 個消息應該保證順序性,如下圖:

5張圖帶你理解 RocketMQ 順序消息實現機制

 

對于 RocketMQ 來說,主要是通過 Producer 和 Consumer 來保證消息順序的。

1 Producer

下面代碼是 Producer 發送順序消息的官方示例:

public static void main(String[] args) throws UnsupportedEncodingException {
 try {
  DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  producer.start();

  String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  for (int i = 0; i < 100; i++) {
   int orderId = i % 10;
   Message msg =
    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
     Integer id = (Integer) arg;
     int index = id % mqs.size();
     return mqs.get(index);
    }
   }, orderId);

   System.out.printf("%s%n", sendResult);
  }

  producer.shutdown();
 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  e.printStackTrace();
 }
}

跟發送并發消息不一樣的是,發送消息時傳入了 MessageQueueSelector,這里可以指定消息發送到固定的 MessageQueue。

注意:上面的代碼把 orderId 相同的消息都會發送到同一個 MessageQueue,這樣同一個 orderId 的消息是有序的,這也叫做局部有序。對應的另一種是全局有序,這需要把所有的消息都發到同一個 MessageQueue。

下面再來看一下發送的代碼:

private SendResult sendSelectImpl(
 Message msg,
 MessageQueueSelector selector,
 Object arg,
 final CommunicationMode communicationMode,
 final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 //省略部分邏輯
 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 if (topicPublishInfo != null && topicPublishInfo.ok()) {
  MessageQueue mq = null;
  try {
   List<MessageQueue> messageQueueList =
    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
   Message userMessage = MessageAccessor.cloneMessage(msg);
   String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
   userMessage.setTopic(userTopic);

   mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
  } catch (Throwable e) {
   throw new MQClientException("select message queue threw exception.", e);
  }

  //省略部分邏輯
  if (mq != null) {
   return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
  } else {
   throw new MQClientException("select message queue return null.", null);
  }
 }
    //省略部分邏輯
}

可以看到,在發送的時候,使用 MessageQueueSelector 選擇一個 MessageQueue,然后發送消息到這個 MessageQueue。對于并發消息,這里不傳 MessageQueueSelector,如果發送方法沒有指定 MessageQueue,就會按照默認的策略選擇一個。

2 Consumer

以 RocketMQ 推模式為例,消費者會注冊一個監聽器,進行消息的拉取和消費處理,下面的 UML 類圖顯示了調用關系:

5張圖帶你理解 RocketMQ 順序消息實現機制

 

上圖中包含了對順序消息和對并發消息的處理。其中 MessageListenerOrderly 和 ConsumeMessageOrderlyService 對順序消息進行處理。跟并發消息不一樣的是,順序消息定義了一個 MessageQueueLock 類,這個類保存了每個 MessageQueue 對應的鎖,代碼如下:

private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

下面代碼是順序消費的官方示例:

public static void main(String[] args) throws MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.subscribe("TopicTest", "TagA || TagC || TagD");

 consumer.registerMessageListener(new MessageListenerOrderly() {
  AtomicLong consumeTimes = new AtomicLong(0);

  @Override
  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
   context.setAutoCommit(true);
   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
   this.consumeTimes.incrementAndGet();
   if ((this.consumeTimes.get() % 2) == 0) {
    return ConsumeOrderlyStatus.SUCCESS;
   } else if ((this.consumeTimes.get() % 5) == 0) {
    context.setSuspendCurrentQueueTimeMillis(3000);
    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
   }

   return ConsumeOrderlyStatus.SUCCESS;
  }
 });

 consumer.start();
 System.out.printf("Consumer Started.%n");
}

下面看一下順序消息的消費端處理邏輯。

2.1 注冊監聽

上面的代碼定義了順序消息監聽器 MessageListenerOrderly,并且注冊到 DefaultMQPushConsumer,這個注冊同時也注冊到了 DefaultMQPushConsumerImpl。

2.2 PushConsumer 初始化

在 DefaultMQPushConsumerImpl 類初始化的時候,會判斷注冊的 MessageListener 是不是 MessageListenerOrderly,如果是,就把 consumeOrderly 變量設置為 true,以此來標記是順序消息拉取還是并發消息拉取。然后把 ConsumeMessageService 初始化為 ConsumeMessageOrderlyService。

2.3 鎖定 mq

要保證消息的順序性,就需要保證同一個 MessageQueue 只能被同一個 Consumer 消費。

ConsumeMessageOrderlyService 初始化的時候,會啟動一個定時任務,周期性(默認 20s)地向 Broker 發送鎖定消息(請求類型 LOCK_BATCH_MQ),Broker 收到后,就會把 MessageQueue、group 和 clientId 進行綁定,這樣其他客戶端就不能從這個 MessageQueue 拉取消息。

注意:Broker 鎖定是有過期時間的,默認 60s,可以配置,鎖定過期后,有可能被其他 Consumer 進行消費。

Broker 端鎖結構如下圖:

5張圖帶你理解 RocketMQ 順序消息實現機制

 

2.4 拉取消息

消費者啟動時,啟動消費拉取線程 PullMessageService,里面死循環不停地從 Broker 拉取消息。這里調用了 DefaultMQPushConsumerImpl 類的 pullMessage 方法。這里拉取消息的邏輯跟并發消息的邏輯是一樣的。

拉取到消息后,調用 PullCallback 的 onSuccess 方法處理結果,這里調用了 ConsumeMessageOrderlyService 的 submitConsumeRequest 方法,里面用線程池提交了 ConsumeRequest 線程。

PullCallback pullCallback = new PullCallback() {
 @Override
 public void onSuccess(PullResult pullResult) {
  if (pullResult != null) {
   pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrApper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

   switch (pullResult.getPullStatus()) {
    case FOUND:
     //省略
     if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
     } else {
      //省略
      boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
      DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
       pullResult.getMsgFoundList(),
       processQueue,
       pullRequest.getMessageQueue(),
       dispatchToConsume);
                        //省略
     }
     //省略
     break;
    //省略
   }
  }
 }
    //省略
};

上面拉取到消息后,先把消息放到了 ProcessQueue,然后調用了 submitConsumeRequest 方法。跟并發消息處理方式不同的是,submitConsumeRequest 方法并沒有處理拉取到的消息,而真正處理的時候是從 ProcessQueue 獲取。

2.5 處理消息

處理消息的邏輯在 ConsumeMessageOrderlyService 的內部類 ConsumeRequest,這是一個線程類,run 方法如下:

public void run() {
 //省略部分邏輯
 //1.獲取到 MessageQueueLock 對應的鎖
 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
 synchronized (objLock) {
  if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
   || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
   final long beginTime = System.currentTimeMillis();
   for (boolean continueConsume = true; continueConsume; ) {
    //省略延后執行的邏輯
    final int consumeBatchSize =
     ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                //2.從 processQueue 拉取消息
    List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
    if (!msgs.isEmpty()) {
     final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

     ConsumeOrderlyStatus status = null;
                    //省略部分邏輯
     boolean hasException = false;
     try {
         //3.獲取處理鎖
      this.processQueue.getConsumeLock().lock();
      //4.執行消費處理邏輯
      status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
     } catch (Throwable e) {
      log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
       RemotingHelper.exceptionSimpleDesc(e),
       ConsumeMessageOrderlyService.this.consumerGroup,
       msgs,
       messageQueue), e);
      hasException = true;
     } finally {
         //5.釋放處理鎖
      this.processQueue.getConsumeLock().unlock();
     }
     //省略部分邏輯
     continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
    } else {
     continueConsume = false;
    }
   }
  } else {
   //省略部分邏輯
   ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
  }
 }
}

上面的代碼總結一下,Consumer 消費消息的邏輯如下:

  1. 對 MessageQueueLock 進行加鎖,這樣就保證只有一個線程在處理當前 MessageQueue;
  2. 從 ProcessQueue 拉取一批消息;
  3. 獲取 ProcessQueue 鎖,這樣保證了只有當前線程可以進行消息處理,同時也可以防止 Rebalance 線程把當前處理的 MessageQueue 移除掉;
  4. 執行消費處理邏輯;
  5. 釋放 ProcessQueue 處理鎖;6.processConsumeResult 方法更新消息偏移量。

注意:ProcessQueue 中的鎖是 ReentrantLock。

3 重試

跟并發消息不一樣的是,順序消息消費失敗后并不會把消息發送到 Broker,而是直接在 Consumer 端進行重試,如果重試次數超過了最大重試次數(16 次),則發送到 Broker,Broker 則將消息推入死信隊列。如下圖:

5張圖帶你理解 RocketMQ 順序消息實現機制

 

4 總結

RocketMQ 順序消息的原理是在 Producer 端把一批需要保證順序的消息發送到同一個 MessageQueue,Consumer 端則通過加鎖的機制來保證消息消費的順序性,Broker 端通過對 MessageQueue 進行加鎖,保證同一個 MessageQueue 只能被同一個 Consumer 進行消費。

根據實現原理可以看到,RocketMQ 的順序消息可能存在兩個坑:

  1. 有順序性的消息需要發送到同一個 MessageQueue,可能導致單個 MessageQueue 消息量很大,而 Consumer 端消費的時候只能單線程消費,很可能導致當前 MessageQueue 消息積壓;
  2. 如果順序消息 MessageQueue 所在的 broker 掛了,這時 Producer 只能把消息發送到其他 Broker 的 MessageQueue 上,而如果新的 MessageQueue 被其他 Consumer 消費,這樣兩個 Consumer 消費的消息就不能保證順序性了。如下圖:
5張圖帶你理解 RocketMQ 順序消息實現機制

 

Broker1 發生故障,把訂單出庫的消息發送到了 Broker2,由 Consumer2 來進行消費,消息順序很可能會錯亂。

分享到:
標簽:RocketMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定