日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

今天,就講講解決消息隊列的數據積壓的三個方案。

 

1 概述

最近生產環境的消息通知隊列發生了大量的數據積壓問題,從而影響到整個平臺商戶的交易無法正常進行,最后只能通過臨時關閉交易量較大的商戶來緩解消息隊列積壓的問題,經線上數據分析,我們的消息隊列在面對交易突發洪峰的情況下無法快速的消費并處理隊列中的數據,考慮到后續還會出現各種交易量突發狀況,以下為針對消息隊列(ActiveMQ)的優化過程。

 

2 消息隊列通信圖

三招!解決消息隊列的數據積壓問題

 

3 問題定位與分析

3.1 消息通知數據為什么會被積壓?分析:平臺中每個交易的發生可能會產生一到多條的消息通知數據,這些通知數據會通過消息隊列(ActiveMQ)來中轉消費并處理,那么在交易量突發洪峰的情況下會產生大量的消息通知數據,如果消息隊列(ActiveMQ)的消費能力被阻塞的話會嚴重影響到數據的吞吐量,從而積壓大量數據無法被快速處理!

3.2 配置了多個ActiveMQ的消費者為什么數據積壓還是無法緩解?分析:經過分析消息隊列的數據消費處理模塊的代碼,消息的消費處理是通過監聽器SessionAwareMessageListener異步回調onMessage方法而接收消息的,但是在回調的方法onMessage上加了synchronized同步鎖,問題就在這里,由于整個onMessage方法被鎖,導致程序只能通過串行(一次只能消費一條數據)處理數據,而無法通過多線程并發處理數據,從而影響了整個隊列的數據消費能力。

public synchronized void onMessage(Message message, Session session)

3.3 去掉synchronized同步鎖會產生多線程并發的安全性問題嗎?分析:首先多個消費者并發處理的數據是不同的,而且多個消費者線程并發回調onMessage方法的時候并未使用到共享的變量,全部在各自線程的方法棧中,所以理論上不會出現多線程并發產生的安全性問題。

3.4 消息會被重復多次消費嗎?

分析:

(1)通過分析ActiveMQ的消費者消息接收處理的源代碼發現,一條消息是否已經消費是通過ack確認機制來保證的,如果是通過異步回調的方式接收消息的話,在onMessage回調函數返回之后會立即進行ack確認提交,那么只要保證onMessage函數內部不拋出異常,及需要內部捕獲異常,那么消息就不會被重復消息。

(2)因為我們的系統在接收到消息后會首先存入db中進行持久化,而且每條消息在存入數據庫的時候都做了唯一性約束,那么即使有重復的消息也不會被正常處理。

 

4 階段一優化方案

4.1 準備測試數據啟動多個線程分別往MQ消息隊列中發送數據,共發送15000個消息,然后啟動消費者模塊消費消息,設定每個消息處理耗時為10ms,配置ActiveMQ的消費者數量為concurrency = 5-100

4.2 優化前性能測試

測試次數 是否并發處理 消息數量

queuePrefetch

consumers 耗時

1 否 15000 1000 15 151s
2 否 15000 1000 16 151s
3 否 15000 1000 15 151s

優化前通過測試數據發現,雖然配置了concurrency = 5-100 (消費者動態伸縮),但是只有15個消費者在忙碌,而且消息都是串行化執行的,15000條消息共需要151s的時間,效率非常差,ps:哈哈,不知道是哪位開發的大神加的同步鎖!

注:queuePrefetch 為MQ的消費者一次從Queue中拉取的數量,默認為1000,consumers為處理消息的消費者數量

4.3 優化后性能測試

4.3.1 取消同步鎖取消在監聽器的回調方法onMessage上的synchronized同步鎖

4.3.2 取消同步鎖后的性能測試

測試次數 是否并發處理 消息數量

queuePrefetch

consumers 耗時

1 是 15000 1000 14 13s
2 是 15000 1000 15 13s
3 是 15000 1000 15 13s

通過以上數據發現取消同步鎖,15000條消息只需要13s就可以處理完,相比之前快了近12倍,雖然速度提升了不少,但是發現配置了5-100的消費者,確只有15個消費者在忙碌,其他消費者都沒有消息可處理,及造成了數據傾斜,那么接下來就要通過優化queuePrefetch 參數了。

4.3.3 優化ActiveMQ的queuePrefetch 參數預獲取消息數量是MQ中重要的調優參數之一,為了提高網絡的傳輸效率,ActiveMQ默認給Consumer批量push 1000條消息,可以從ActiveMQ源碼中的ActiveMQPrefetchPolicy類的DEFAULT_QUEUE_PREFETCH字段得知,考慮到我們的通知消息的消費處理中涉及到數據庫的操作,以及綜合網絡傳輸效率,這里將queuePrefetch的值設置為100,具體需配置到ActiveMQ的連接地址后,如:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100

4.3.4 優化queuePrefetch參數后的性能測試

測試次數 是否并發處理 消息數量

queuePrefetch

consumers 耗時

1 是 15000 100 40 7s
2 是 15000 100 47 5s
3 是 15000 100 41 6s

將ActiveMQ的queuePrefetch參數修改為100,那么發現有近一半的消費者在處理數據,最后15000條消息需要6s中就可以處理完成。

4.3.5 結論通過以上兩步的優化后的測試結果可以得出,取消同步鎖之后隊列的消費能力提升了近11倍,在取消同步鎖的基礎上再優化ActiveMQ批處理參數后性能又提升了近1倍,綜合以上兩步的優化處理,隊列整體的消費能力提高了30多倍。

三招!解決消息隊列的數據積壓問題

 

5 階段二優化方案

階段二的優化方案是在階段一的基礎上進行的優化處理

5.1 單隊列處理

三招!解決消息隊列的數據積壓問題

由于我們的消息通知業務屬于冪等性操作,會按照設定的通知次數來反復通知處理,直到通知成功為止,我們系統現在的做法是將接收到MQ的消息暫存于延時隊列(DelayQueue)中,然后通過多線程輪訓取出,然后通過HTTP通知到其他模塊處理,如果通知失敗,則重新放入同一個延時隊列等待下次執行,如上圖:消息1通知失敗后會重新放入延時隊列。

注:單隊列處理的不足

由于使用了單隊列處理,使得可以一次通知成功的消息與通知多次失敗的消息混合在了一起,這樣在隊列中失敗通知的消息就會阻塞到后續可以正常通知的消息,最終導致消息整體的一個吞吐量下降

5.2 雙隊列處理

三招!解決消息隊列的數據積壓問題

針對5.1單隊列的不足,我們可以重新設計,將單隊列設計為雙隊列處理,雙隊列的核心思想為如果隊列1中的消息通知失敗,則不再重新放入隊列1,而是放入隊列2去通知,這樣可以起到消息數據分離的作用,及失敗通知的數據不再會影響到后續可以成功通知的消息,從而提高隊列消息通知的整體性能!

 

6 階段三優化方案

6.1 MQ組件重選型

ActiveMQ是一個老牌的消息隊列組件,吞吐量方面表現不是很理想,適合在業務量不大的場景中使用,現在有非常多比較成熟及高性能高吞吐的消息隊列組件可供我們選擇,如:RabbitMQ、RocketMQ、Kafka,后續可根據實際情況考慮替換掉ActiveMQ組件。

 

7 總結

針對消息隊列的數據積壓問題,我們主要做了三個方面的優化處理,取消同步鎖、ActiveMQ參數優化、本地雙隊列優化,通過這三個方面的優化基本解決了隊列數據積壓的問題。

文章來源:http://JAVAjgs.com/archives/5572

分享到:
標簽:隊列 消息
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定