整體閱讀時(shí)間,在 40 分鐘左右。
常見的消息隊(duì)列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,相關(guān)的選型可以看我之前的系列,這篇文章只講 RabbitMQ,先講原理,后搞實(shí)戰(zhàn)。
文章很長,如果你能一次性看完,“大哥,請(qǐng)收下我的膝蓋”,建議大家先收藏,啥時(shí)需要面試,或者工作中遇到了,可以再慢慢看。
不 BB,直接上思維導(dǎo)圖:
1. 消息隊(duì)列
1.1 消息隊(duì)列模式
消息隊(duì)列目前主要 2 種模式,分別為“點(diǎn)對(duì)點(diǎn)模式”和“發(fā)布/訂閱模式”。
1.1.1 點(diǎn)對(duì)點(diǎn)模式
一個(gè)具體的消息只能由一個(gè)消費(fèi)者消費(fèi),多個(gè)生產(chǎn)者可以向同一個(gè)消息隊(duì)列發(fā)送消息,但是一個(gè)消息在被一個(gè)消息者處理的時(shí)候,這個(gè)消息在隊(duì)列上會(huì)被鎖住或者被移除并且其他消費(fèi)者無法處理該消息。
需要額外注意的是,如果消費(fèi)者處理一個(gè)消息失敗了,消息系統(tǒng)一般會(huì)把這個(gè)消息放回隊(duì)列,這樣其他消費(fèi)者可以繼續(xù)處理。
1.1.2 發(fā)布/訂閱模式
單個(gè)消息可以被多個(gè)訂閱者并發(fā)的獲取和處理。一般來說,訂閱有兩種類型:
- 臨時(shí)(ephemeral)訂閱:這種訂閱只有在消費(fèi)者啟動(dòng)并且運(yùn)行的時(shí)候才存在。一旦消費(fèi)者退出,相應(yīng)的訂閱以及尚未處理的消息就會(huì)丟失。
- 持久(durable)訂閱:這種訂閱會(huì)一直存在,除非主動(dòng)去刪除。消費(fèi)者退出后,消息系統(tǒng)會(huì)繼續(xù)維護(hù)該訂閱,并且后續(xù)消息可以被繼續(xù)處理。
1.2 衡量標(biāo)準(zhǔn)
對(duì)消息隊(duì)列進(jìn)行技術(shù)選型時(shí),需要通過以下指標(biāo)衡量你所選擇的消息隊(duì)列,是否可以滿足你的需求:
- 消息順序:發(fā)送到隊(duì)列的消息,消費(fèi)時(shí)是否可以保證消費(fèi)的順序,比如A先下單,B后下單,應(yīng)該是A先去扣庫存,B再去扣,順序不能反。
- 消息路由:根據(jù)路由規(guī)則,只訂閱匹配路由規(guī)則的消息,比如有A/B兩者規(guī)則的消息,消費(fèi)者可以只訂閱A消息,B消息不會(huì)消費(fèi)。
- 消息可靠性:是否會(huì)存在丟消息的情況,比如有A/B兩個(gè)消息,最后只有B消息能消費(fèi),A消息丟失。
- 消息時(shí)序:主要包括“消息存活時(shí)間”和“延遲/預(yù)定的消息”,“消息存活時(shí)間”表示生產(chǎn)者可以對(duì)消息設(shè)置TTL,如果超過該TTL,消息會(huì)自動(dòng)消失;“延遲/預(yù)定的消息”指的是可以延遲或者預(yù)訂消費(fèi)消息,比如延時(shí)5分鐘,那么消息會(huì)5分鐘后才能讓消費(fèi)者消費(fèi),時(shí)間未到的話,是不能消費(fèi)的。
- 消息留存:消息消費(fèi)成功后,是否還會(huì)繼續(xù)保留在消息隊(duì)列。
- 容錯(cuò)性:當(dāng)一條消息消費(fèi)失敗后,是否有一些機(jī)制,保證這條消息是一種能成功,比如異步第三方退款消息,需要保證這條消息消費(fèi)掉,才能確定給用戶退款成功,所以必須保證這條消息消費(fèi)成功的準(zhǔn)確性。
- 伸縮:當(dāng)消息隊(duì)列性能有問題,比如消費(fèi)太慢,是否可以快速支持庫容;當(dāng)消費(fèi)隊(duì)列過多,浪費(fèi)系統(tǒng)資源,是否可以支持縮容。
- 吞吐量:支持的最高并發(fā)數(shù)。
2. RabbitMQ 原理初探
RabbitMQ 2007 年發(fā)布,是使用 Erlang 語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于 AMQP 協(xié)議來實(shí)現(xiàn)。
2.1 基本概念
提到RabbitMQ,就不得不提AMQP協(xié)議。AMQP協(xié)議是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
先了解一下AMQP協(xié)議中間的幾個(gè)重要概念:
- Server:接收客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。
- Connection:連接,應(yīng)用程序與Server的網(wǎng)絡(luò)連接,TCP連接。
- Channel:信道,消息讀寫等操作在信道中進(jìn)行。客戶端可以建立多個(gè)信道,每個(gè)信道代表一個(gè)會(huì)話任務(wù)。
- Message:消息,應(yīng)用程序和服務(wù)器之間傳送的數(shù)據(jù),消息可以非常簡單,也可以很復(fù)雜。由Properties和Body組成。Properties為外包裝,可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body就是消息體內(nèi)容。
- Virtual Host:虛擬主機(jī),用于邏輯隔離。一個(gè)虛擬主機(jī)里面可以有若干個(gè)Exchange和Queue,同一個(gè)虛擬主機(jī)里面不能有相同名稱的Exchange或Queue。
- Exchange:交換器,接收消息,按照路由規(guī)則將消息路由到一個(gè)或者多個(gè)隊(duì)列。如果路由不到,或者返回給生產(chǎn)者,或者直接丟棄。RabbitMQ常用的交換器常用類型有direct、topic、fanout、headers四種,后面詳細(xì)介紹。
- Binding:綁定,交換器和消息隊(duì)列之間的虛擬連接,綁定中可以包含一個(gè)或者多個(gè)RoutingKey。
- RoutingKey:路由鍵,生產(chǎn)者將消息發(fā)送給交換器的時(shí)候,會(huì)發(fā)送一個(gè)RoutingKey,用來指定路由規(guī)則,這樣交換器就知道把消息發(fā)送到哪個(gè)隊(duì)列。路由鍵通常為一個(gè)“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息隊(duì)列,用來保存消息,供消費(fèi)者消費(fèi)。
2.2 工作原理
AMQP 協(xié)議模型由三部分組成:生產(chǎn)者、消費(fèi)者和服務(wù)端,執(zhí)行流程如下:
- 生產(chǎn)者是連接到 Server,建立一個(gè)連接,開啟一個(gè)信道。
- 生產(chǎn)者聲明交換器和隊(duì)列,設(shè)置相關(guān)屬性,并通過路由鍵將交換器和隊(duì)列進(jìn)行綁定。
- 消費(fèi)者也需要進(jìn)行建立連接,開啟信道等操作,便于接收消息。
- 生產(chǎn)者發(fā)送消息,發(fā)送到服務(wù)端中的虛擬主機(jī)。
- 虛擬主機(jī)中的交換器根據(jù)路由鍵選擇路由規(guī)則,發(fā)送到不同的消息隊(duì)列中。
- 訂閱了消息隊(duì)列的消費(fèi)者就可以獲取到消息,進(jìn)行消費(fèi)。
2.3 常用交換器
RabbitMQ常用的交換器類型有direct、topic、fanout、headers四種:
- Direct Exchange:見文知意,直連交換機(jī)意思是此交換機(jī)需要綁定一個(gè)隊(duì)列,要求該消息與一個(gè)特定的路由鍵完全匹配。簡單點(diǎn)說就是一對(duì)一的,點(diǎn)對(duì)點(diǎn)的發(fā)送。
- Fanout Exchange:這種類型的交換機(jī)需要將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。簡單點(diǎn)說就是發(fā)布訂閱。
- Topic Exchange:直接翻譯的話叫做主題交換機(jī),如果從用法上面翻譯可能叫通配符交換機(jī)會(huì)更加貼切。這種交換機(jī)是使用通配符去匹配,路由到對(duì)應(yīng)的隊(duì)列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符號(hào)。
- *符號(hào):有且只匹配一個(gè)詞。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
- #符號(hào):匹配一個(gè)或多個(gè)詞。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。
- Headers Exchange:這種交換機(jī)用的相對(duì)沒這么多。它跟上面三種有點(diǎn)區(qū)別,它的路由不是用routingKey進(jìn)行路由匹配,而是在匹配請(qǐng)求頭中所帶的鍵值進(jìn)行路由。創(chuàng)建隊(duì)列需要設(shè)置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機(jī)會(huì)根據(jù)生產(chǎn)者發(fā)送過來的頭部信息攜帶的鍵值去匹配隊(duì)列綁定的鍵值,路由到對(duì)應(yīng)的隊(duì)列。
2.4 消費(fèi)原理
我們先看幾個(gè)基本概念:
- broker:每個(gè)節(jié)點(diǎn)運(yùn)行的服務(wù)程序,功能為維護(hù)該節(jié)點(diǎn)的隊(duì)列的增刪以及轉(zhuǎn)發(fā)隊(duì)列操作請(qǐng)求。
- master queue:每個(gè)隊(duì)列都分為一個(gè)主隊(duì)列和若干個(gè)鏡像隊(duì)列。
- mirror queue:鏡像隊(duì)列,作為master queue的備份。在master queue所在節(jié)點(diǎn)掛掉之后,系統(tǒng)把mirror queue提升為master queue,負(fù)責(zé)處理客戶端隊(duì)列操作請(qǐng)求。注意,mirror queue只做鏡像,設(shè)計(jì)目的不是為了承擔(dān)客戶端讀寫壓力。
集群中有兩個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)上有一個(gè)broker,每個(gè)broker負(fù)責(zé)本機(jī)上隊(duì)列的維護(hù),并且borker之間可以互相通信。集群中有兩個(gè)隊(duì)列A和B,每個(gè)隊(duì)列都分為master queue和mirror queue(備份)。那么隊(duì)列上的生產(chǎn)消費(fèi)怎么實(shí)現(xiàn)的呢?
對(duì)于消費(fèi)隊(duì)列,如下圖有兩個(gè)consumer消費(fèi)隊(duì)列A,這兩個(gè)consumer連在了集群的不同機(jī)器上。RabbitMQ集群中的任何一個(gè)節(jié)點(diǎn)都擁有集群上所有隊(duì)列的元信息,所以連接到集群中的任何一個(gè)節(jié)點(diǎn)都可以,主要區(qū)別在于有的consumer連在master queue所在節(jié)點(diǎn),有的連在非master queue節(jié)點(diǎn)上。
因?yàn)閙irror queue要和master queue保持一致,故需要同步機(jī)制,正因?yàn)橐恢滦缘南拗疲瑢?dǎo)致所有的讀寫操作都必須都操作在master queue上(想想,為啥讀也要從master queue中讀?和數(shù)據(jù)庫讀寫分離是不一樣的),然后由master節(jié)點(diǎn)同步操作到mirror queue所在的節(jié)點(diǎn)。即使consumer連接到了非master queue節(jié)點(diǎn),該consumer的操作也會(huì)被路由到master queue所在的節(jié)點(diǎn)上,這樣才能進(jìn)行消費(fèi)。
對(duì)于生成隊(duì)列,原理和消費(fèi)一樣,如果連接到非 master queue 節(jié)點(diǎn),則路由過去。
所以,到這里小伙伴們就可以看到 RabbitMQ的不足:由于master queue單節(jié)點(diǎn),導(dǎo)致性能瓶頸,吞吐量受限。雖然為了提高性能,內(nèi)部使用了Erlang這個(gè)語言實(shí)現(xiàn),但是終究擺脫不了架構(gòu)設(shè)計(jì)上的致命缺陷。
2.5 高級(jí)特性
2.5.1 過期時(shí)間
Time To Live,也就是生存時(shí)間,是一條消息在隊(duì)列中的最大存活時(shí)間,單位是毫秒,下面看看RabbitMQ過期時(shí)間特性:
- RabbitMQ可以對(duì)消息和隊(duì)列設(shè)置TTL。
- RabbitMQ支持設(shè)置消息的過期時(shí)間,在消息發(fā)送的時(shí)候可以進(jìn)行指定,每條消息的過期時(shí)間可以不同。
- RabbitMQ支持設(shè)置隊(duì)列的過期時(shí)間,從消息入隊(duì)列開始計(jì)算,直到超過了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)變成死信,自動(dòng)清除。
- 如果兩種方式一起使用,則過期時(shí)間以兩者中較小的那個(gè)數(shù)值為準(zhǔn)。
- 當(dāng)然也可以不設(shè)置TTL,不設(shè)置表示消息不會(huì)過期;如果設(shè)置為0,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者,否則該消息將被立即丟棄。
2.5.2 消息確認(rèn)
為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制。
消費(fèi)者訂閱隊(duì)列的時(shí)候,可以指定autoAck參數(shù),當(dāng)autoAck為true的時(shí)候,RabbitMQ采用自動(dòng)確認(rèn)模式,RabbitMQ自動(dòng)把發(fā)送出去的消息設(shè)置為確認(rèn),然后從內(nèi)存或者硬盤中刪除,而不管消費(fèi)者是否真正消費(fèi)到了這些消息。
當(dāng)autoAck為false的時(shí)候,RabbitMQ會(huì)等待消費(fèi)者回復(fù)的確認(rèn)信號(hào),收到確認(rèn)信號(hào)之后才從內(nèi)存或者磁盤中刪除消息。
消息確認(rèn)機(jī)制是RabbitMQ消息可靠性投遞的基礎(chǔ),只要設(shè)置autoAck參數(shù)為false,消費(fèi)者就有足夠的時(shí)間處理消息,不用擔(dān)心處理消息的過程中消費(fèi)者進(jìn)程掛掉后消息丟失的問題。
2.5.3 持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丟失數(shù)據(jù)。RabbitMQ的持久化分為三個(gè)部分:交換器持久化、隊(duì)列持久化和消息的持久化。
交換器持久化可以通過在聲明隊(duì)列時(shí)將durable參數(shù)設(shè)置為true。如果交換器不設(shè)置持久化,那么在RabbitMQ服務(wù)重啟之后,相關(guān)的交換器元數(shù)據(jù)會(huì)丟失,不過消息不會(huì)丟失,只是不能將消息發(fā)送到這個(gè)交換器了。
隊(duì)列的持久化能保證其本身的元數(shù)據(jù)不會(huì)因異常情況而丟失,但是不能保證內(nèi)部所存儲(chǔ)的消息不會(huì)丟失。要確保消息不會(huì)丟失,需要將其設(shè)置為持久化。隊(duì)列的持久化可以通過在聲明隊(duì)列時(shí)將durable參數(shù)設(shè)置為true。
設(shè)置了隊(duì)列和消息的持久化,當(dāng)RabbitMQ服務(wù)重啟之后,消息依然存在。如果只設(shè)置隊(duì)列持久化或者消息持久化,重啟之后消息都會(huì)消失。
當(dāng)然,也可以將所有的消息都設(shè)置為持久化,但是這樣做會(huì)影響RabbitMQ的性能,因?yàn)榇疟P的寫入速度比內(nèi)存的寫入要慢得多。
對(duì)于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量。魚和熊掌不可兼得,關(guān)鍵在于選擇和取舍。在實(shí)際中,需要根據(jù)實(shí)際情況在可靠性和吞吐量之間做一個(gè)權(quán)衡。
2.5.4 死信隊(duì)列
當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,他能被重新發(fā)送到另一個(gè)交換器中,這個(gè)交換器成為死信交換器,與該交換器綁定的隊(duì)列稱為死信隊(duì)列。
消息變成死信有下面幾種情況:
- 消息被拒絕。
- 消息過期
- 隊(duì)列達(dá)到最大長度
DLX也是一個(gè)正常的交換器,和一般的交換器沒有區(qū)別,他能在任何的隊(duì)列上面被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。當(dāng)這個(gè)隊(duì)列中有死信的時(shí)候,RabbitMQ會(huì)自動(dòng)將這個(gè)消息重新發(fā)送到設(shè)置的交換器上,進(jìn)而被路由到另一個(gè)隊(duì)列,我們可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理。
死信隊(duì)列有什么用?當(dāng)發(fā)生異常的時(shí)候,消息不能夠被消費(fèi)者正常消費(fèi),被加入到了死信隊(duì)列中。后續(xù)的程序可以根據(jù)死信隊(duì)列中的內(nèi)容分析當(dāng)時(shí)發(fā)生的異常,進(jìn)而改善和優(yōu)化系統(tǒng)。
2.5.5 延遲隊(duì)列
一般的隊(duì)列,消息一旦進(jìn)入隊(duì)列就會(huì)被消費(fèi)者立即消費(fèi)。延遲隊(duì)列就是進(jìn)入該隊(duì)列的消息會(huì)被消費(fèi)者延遲消費(fèi),延遲隊(duì)列中存儲(chǔ)的對(duì)象是的延遲消息,“延遲消息”是指當(dāng)消息被發(fā)送以后,等待特定的時(shí)間后,消費(fèi)者才能拿到這個(gè)消息進(jìn)行消費(fèi)。
延遲隊(duì)列用于需要延遲工作的場(chǎng)景。最常見的使用場(chǎng)景:淘寶或者天貓我們都使用過,用戶在下單之后通常有30分鐘的時(shí)間進(jìn)行支付,如果這30分鐘之內(nèi)沒有支付成功,那么訂單就會(huì)自動(dòng)取消。
除了延遲消費(fèi),延遲隊(duì)列的典型應(yīng)用場(chǎng)景還有延遲重試。比如消費(fèi)者從隊(duì)列里面消費(fèi)消息失敗了,可以延遲一段時(shí)間以后進(jìn)行重試。
2.6 特性分析
這里才是內(nèi)容的重點(diǎn),不僅需要知道Rabbit的特性,還需要知道支持這些特性的原因:
- 消息路由(支持):RabbitMQ可以通過不同的交換器支持不同種類的消息路由;
- 消息有序(不支持):當(dāng)消費(fèi)消息時(shí),如果消費(fèi)失敗,消息會(huì)被放回隊(duì)列,然后重新消費(fèi),這樣會(huì)導(dǎo)致消息無序;
- 消息時(shí)序(非常好):通過延時(shí)隊(duì)列,可以指定消息的延時(shí)時(shí)間,過期時(shí)間TTL等;
- 容錯(cuò)處理(非常好):通過交付重試和死信交換器(DLX)來處理消息處理故障;
- 伸縮(一般):伸縮其實(shí)沒有非常智能,因?yàn)榧词股炜s了,master queue還是只有一個(gè),負(fù)載還是只有這一個(gè)master queue去抗,所以我理解RabbitMQ的伸縮很弱(個(gè)人理解)。
- 持久化(不太好):沒有消費(fèi)的消息,可以支持持久化,這個(gè)是為了保證機(jī)器宕機(jī)時(shí)消息可以恢復(fù),但是消費(fèi)過的消息,就會(huì)被馬上刪除,因?yàn)镽abbitMQ設(shè)計(jì)時(shí),就不是為了去存儲(chǔ)歷史數(shù)據(jù)的。
- 消息回溯(不支持):因?yàn)橄⒉恢С钟谰帽4妫宰匀痪筒恢С只厮荨?/li>
- 高吞吐(中等):因?yàn)樗械恼?qǐng)求的執(zhí)行,最后都是在master queue,它的這個(gè)設(shè)計(jì),導(dǎo)致單機(jī)性能達(dá)不到十萬級(jí)的標(biāo)準(zhǔn)。
3. RabbitMQ環(huán)境搭建
因?yàn)槲矣玫氖莔ac,所以直接可以參考官網(wǎng):
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執(zhí)行:
brew update
然后再執(zhí)行:
brew install rabbitmq
之前沒有執(zhí)行brew update,直接執(zhí)行brew install rabbitmq時(shí),會(huì)報(bào)各種各樣奇怪的錯(cuò)誤,其中“403 Forbidde”居多。
但是在執(zhí)行“brew install rabbitmq”,會(huì)自動(dòng)安裝其它的程序,如果你使用源碼安裝Rabbitmq,因?yàn)閱?dòng)該服務(wù)依賴erlang環(huán)境,所以你還需手動(dòng)安裝erlang,但是目前官方已經(jīng)一鍵給你搞定,會(huì)自動(dòng)安裝Rabbitmq依賴的所有程序,是不是很棒!
最后執(zhí)行成功的輸出如下:
啟動(dòng)服務(wù):
# 啟動(dòng)方式1:后臺(tái)啟動(dòng)
brew services start rabbitmq
# 啟動(dòng)方式2:當(dāng)前窗口啟動(dòng)
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在瀏覽器輸入:
http://localhost:15672/
會(huì)出現(xiàn)RabbitMQ后臺(tái)管理界面(用戶名和密碼都為guest):
通過brew安裝,一行命令搞定,真香!
4. RabbitMQ測(cè)試
4.1 添加賬號(hào)
首先得啟動(dòng)mq
## 添加賬號(hào)
./rabbitmqctl add_user admin admin
## 添加訪問權(quán)限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設(shè)置超級(jí)權(quán)限
./rabbitmqctl set_user_tags admin administrator
4.2 編碼實(shí)測(cè)
因?yàn)榇a中引入了JAVA 8的特性,pom引入依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
<plugins>
<plugin>
<groupId>org.Apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
開始寫代碼:
public class RabbitMqTest {
//消息隊(duì)列名稱
private final static String QUEUE_NAME = "hello";
@Test
public void send() throws java.io.IOException, TimeoutException {
//創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
//生成一個(gè)消息隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;
//發(fā)布消息,第一個(gè)參數(shù)表示路由(Exchange名稱),為""則表示使用默認(rèn)消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//關(guān)閉消息通道和連接
channel.close();
connection.close();
}
@Test
public void consumer() throws java.io.IOException, TimeoutException {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
執(zhí)行send()后控制臺(tái)輸出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'
執(zhí)行consumer()后:
示例中的代碼講解,可以直接參考官網(wǎng):https://www.rabbitmq.com/tutorials/tutorial-one-java.html
5. 基本使用姿勢(shì)
5.1 公共代碼封裝
封裝工廠類:
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//創(chuàng)建連接工程,下面給出的是默認(rèn)的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
封裝生成者:
public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動(dòng)刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發(fā)布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
}
封裝消費(fèi)者:
public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊(duì)列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊(duì)列到交換機(jī)
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自動(dòng)ack
channel.basicConsume(queue, false, consumer);
}
}
5.2 Direct方式
5.2.1 Direct示例
生產(chǎn)者:
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectProducer directProducer = new DirectProducer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg(routingKey[i % 3], msg + i);
}
System.out.println("----over-------");
Thread.sleep(1000 * 60 * 100);
}
}
執(zhí)行生產(chǎn)者,往消息隊(duì)列中放入10條消息,其中key分別為“aaa”、“bbb”和“ccc”,分別放入qa、qb、qc三個(gè)隊(duì)列:
下面是qa隊(duì)列的信息:
消費(fèi)者:
public class DirectConsumer {
private static final String exchangeName = "direct.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
}
執(zhí)行后的輸出:
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 0
[x] Done
[x] Received 'hello >>> 3
[x] Done
[x] Received 'hello >>> 6
[x] Done
[x] Received 'hello >>> 9
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 1
[x] Done
[x] Received 'hello >>> 4
[x] Done
[x] Received 'hello >>> 7
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 2
[x] Done
[x] Received 'hello >>> 5
[x] Done
[x] Received 'hello >>> 8
[x] Done
可以看到,分別從qa、qb、qc中將不同的key的數(shù)據(jù)消費(fèi)掉。
5.2.2 問題探討
有個(gè)疑問:這個(gè)隊(duì)列的名稱qa、qb和qc是RabbitMQ自動(dòng)生成的么,我們可以指定隊(duì)列名稱么?
我做了個(gè)簡單的實(shí)驗(yàn),我把消費(fèi)者代碼修改了一下:
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 將qc修改為qc1
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
執(zhí)行后如下圖所示:
我們可以發(fā)現(xiàn),多了一個(gè)qc1,所以可以判斷這個(gè)界面中的queues,是消費(fèi)者執(zhí)行時(shí),會(huì)將消費(fèi)者指定的隊(duì)列名稱和direct.exchange綁定,綁定的依據(jù)就是key。
當(dāng)我們把隊(duì)列中的數(shù)據(jù)全部消費(fèi)掉,然后重新執(zhí)行生成者后,會(huì)發(fā)現(xiàn)qc和qc1中都有3條待消費(fèi)的數(shù)據(jù),因?yàn)榻壎ǖ膋ey都是“ccc”,所以兩者的數(shù)據(jù)是一樣的:
綁定關(guān)系如下:
注意:當(dāng)沒有Queue綁定到Exchange時(shí),往Exchange中寫入的消息也不會(huì)重新分發(fā)到之后綁定的queue上。
思考:不執(zhí)行消費(fèi)者,看不到這個(gè)Queres中信息,我其實(shí)可以把這個(gè)界面理解為消費(fèi)者信息界面。不過感覺還是怪怪的,這個(gè)queues如果是消費(fèi)者信息,就不應(yīng)該叫queues,我理解queues應(yīng)該是RabbitMQ中實(shí)際存放數(shù)據(jù)的queues,難道是我理解錯(cuò)了?
5.3 Fanout方式(指定隊(duì)列)
生產(chǎn)者封裝:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg("", msg + i);
}
}
}
消費(fèi)者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], "");
}
}
}
執(zhí)行生成者,結(jié)果如下:
我們發(fā)現(xiàn),生產(chǎn)者生產(chǎn)的10條數(shù)據(jù),在每個(gè)消費(fèi)者中都可以消費(fèi),這個(gè)是和Direct不同的地方,但是使用Fanout方式時(shí),有幾個(gè)點(diǎn)需要注意一下:
- 生產(chǎn)者的routkey可以為空,因?yàn)樯a(chǎn)者的所有數(shù)據(jù),會(huì)下放到每一個(gè)隊(duì)列,所以不會(huì)通過routkey去路由;
- 消費(fèi)者需要指定queues,因?yàn)橄M(fèi)者需要綁定到指定的queues才能消費(fèi)。
這幅圖就畫出了Fanout的精髓之處,exchange會(huì)和所有的queue進(jìn)行綁定,不區(qū)分路由,消費(fèi)者需要綁定指定的queue才能發(fā)起消費(fèi)。
注意:往隊(duì)列塞數(shù)據(jù)時(shí),可能通過界面看不到消息個(gè)數(shù)的增加,可能是你之前已經(jīng)開啟了消費(fèi)進(jìn)程,導(dǎo)致增加的消息馬上被消費(fèi)了。
5.4 Fanout方式(隨機(jī)獲取隊(duì)列)
上面我們是指定了隊(duì)列,這個(gè)方式其實(shí)很不友好,比如對(duì)于Fanout,我其實(shí)根本無需關(guān)心隊(duì)列的名字,如果還指定對(duì)應(yīng)隊(duì)列進(jìn)行消費(fèi),感覺這個(gè)很冗余,所以我們這里就采用隨機(jī)獲取隊(duì)列名字的方式,下面代碼直接Copy官網(wǎng)。
生成者封裝:
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息
channel.exchangeDeclare(exchange, exchangeType);
// 發(fā)布消息
channel.basicPublish(exchange, "", null, message.getBytes("UTF-8"));
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
消費(fèi)者封裝:
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
生產(chǎn)者:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void publishMsg(String msg) {
try {
MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10000; i++) {
directProducer.publishMsg(msg + i);
}
}
}
消費(fèi)者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void msgConsumer() {
try {
MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
for (int i = 0; i < 3; i++) {
consumer.msgConsumer();
}
}
}
執(zhí)行后,管理界面如下:
5.5 Topic方式
代碼詳見官網(wǎng):https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,請(qǐng)直接查看官網(wǎng):https://www.rabbitmq.com/getstarted.html
6. RabbitMQ 進(jìn)階
6.1 durable 和 autoDeleted
在定義Queue時(shí),可以指定這兩個(gè)參數(shù):
/**
* Declare an exchange.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
6.1.1 durable
持久化,保證RabbitMQ在退出或者crash等異常情況下數(shù)據(jù)沒有丟失,需要將queue,exchange和Message都持久化。
若是將queue的持久化標(biāo)識(shí)durable設(shè)置為true,則代表是一個(gè)持久的隊(duì)列,那么在服務(wù)重啟之后,會(huì)重新讀取之前被持久化的queue。
雖然隊(duì)列可以被持久化,但是里面的消息是否為持久化,還要看消息的持久化設(shè)置。即重啟queue,但是queue里面還沒有發(fā)出去的消息,那隊(duì)列里面還存在該消息么?這個(gè)取決于該消息的設(shè)置。
6.1.2 autoDeleted
自動(dòng)刪除,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列。
當(dāng)一個(gè)Queue被設(shè)置為自動(dòng)刪除時(shí),當(dāng)消費(fèi)者斷掉之后,queue會(huì)被刪除,這個(gè)主要針對(duì)的是一些不是特別重要的數(shù)據(jù),不希望出現(xiàn)消息積累的情況。
6.1.3 小節(jié)
- 當(dāng)一個(gè)Queue已經(jīng)聲明好了之后,不能更新durable或者autoDelted值;當(dāng)需要修改時(shí),需要先刪除再重新聲明
- 消費(fèi)的Queue聲明應(yīng)該和投遞的Queue聲明的 durable,autoDelted屬性一致,否則會(huì)報(bào)錯(cuò)
- 對(duì)于重要的數(shù)據(jù),一般設(shè)置 durable=true, autoDeleted=false
- 對(duì)于設(shè)置 autoDeleted=true 的隊(duì)列,當(dāng)沒有消費(fèi)者之后,隊(duì)列會(huì)自動(dòng)被刪除
6.4 ACK
執(zhí)行一個(gè)任務(wù)可能需要花費(fèi)幾秒鐘,你可能會(huì)擔(dān)心如果一個(gè)消費(fèi)者在執(zhí)行任務(wù)過程中掛掉了。一旦RabbitMQ將消息分發(fā)給了消費(fèi)者,就會(huì)從內(nèi)存中刪除。在這種情況下,如果正在執(zhí)行任務(wù)的消費(fèi)者宕機(jī),會(huì)丟失正在處理的消息和分發(fā)給這個(gè)消費(fèi)者但尚未處理的消息。
但是,我們不想丟失任何任務(wù),如果有一個(gè)消費(fèi)者掛掉了,那么我們應(yīng)該將分發(fā)給它的任務(wù)交付給另一個(gè)消費(fèi)者去處理。
為了確保消息不會(huì)丟失,RabbitMQ支持消息應(yīng)答。消費(fèi)者發(fā)送一個(gè)消息應(yīng)答,告訴RabbitMQ這個(gè)消息已經(jīng)接收并且處理完畢了。RabbitMQ就可以刪除它了。
因此手動(dòng)ACK的常見手段:
// 接收消息之后,主動(dòng)ack/nak
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 取消自動(dòng)ack
channel.basicConsume(queue, false, consumer);