大家好呀,我是樓仔。
RabbitMQ 的文章之前寫過,但是當(dāng)時給的示例是 Demo 版的,這篇文章主要是結(jié)合之前寫的理論知識,將 RabbitMQ 集成到技術(shù)派項目中。
不 BB,上文章目錄:
下面我們先回顧一下理論知識,如果對這塊知識已經(jīng)清楚的同學(xué),可以直接跳到實戰(zhàn)部分。
1. 消息隊列
1.1 消息隊列模式
消息隊列目前主要 2 種模式,分別為“點對點模式”和“發(fā)布/訂閱模式”。
點對點模式
一個具體的消息只能由一個消費者消費,多個生產(chǎn)者可以向同一個消息隊列發(fā)送消息,但是一個消息在被一個消息者處理的時候,這個消息在隊列上會被鎖住或者被移除并且其他消費者無法處理該消息。
需要額外注意的是,如果消費者處理一個消息失敗了,消息系統(tǒng)一般會把這個消息放回隊列,這樣其他消費者可以繼續(xù)處理。
發(fā)布/訂閱模式
單個消息可以被多個訂閱者并發(fā)的獲取和處理。一般來說,訂閱有兩種類型:
- 臨時(ephemeral)訂閱:這種訂閱只有在消費者啟動并且運行的時候才存在。一旦消費者退出,相應(yīng)的訂閱以及尚未處理的消息就會丟失。
- 持久(durable)訂閱:這種訂閱會一直存在,除非主動去刪除。消費者退出后,消息系統(tǒng)會繼續(xù)維護該訂閱,并且后續(xù)消息可以被繼續(xù)處理。
1.2 RabbitMQ 特征
- 消息路由(支持):RabbitMQ可以通過不同的交換器支持不同種類的消息路由;
- 消息有序(不支持):當(dāng)消費消息時,如果消費失敗,消息會被放回隊列,然后重新消費,這樣會導(dǎo)致消息無序;
- 消息時序(非常好):通過延時隊列,可以指定消息的延時時間,過期時間TTL等;
- 容錯處理(非常好):通過交付重試和死信交換器(DLX)來處理消息處理故障;
- 伸縮(一般):伸縮其實沒有非常智能,因為即使伸縮了,master queue還是只有一個,負(fù)載還是只有這一個master queue去抗,所以我理解RabbitMQ的伸縮很弱(個人理解)。
- 持久化(不太好):沒有消費的消息,可以支持持久化,這個是為了保證機器宕機時消息可以恢復(fù),但是消費過的消息,就會被馬上刪除,因為RabbitMQ設(shè)計時,就不是為了去存儲歷史數(shù)據(jù)的。
- 消息回溯(支持):因為消息不支持永久保存,所以自然就不支持回溯。
- 高吞吐(中等):因為所有的請求的執(zhí)行,最后都是在master queue,它的這個設(shè)計,導(dǎo)致單機性能達不到十萬級的標(biāo)準(zhǔn)。
2. RabbitMQ 原理初探
RabbitMQ 2007 年發(fā)布,是使用 Erlang 語言開發(fā)的開源消息隊列系統(tǒng),基于 AMQP 協(xié)議來實現(xiàn)。
2.1 基本概念
提到RabbitMQ,就不得不提AMQP協(xié)議。AMQP協(xié)議是具有現(xiàn)代特征的二進制協(xié)議。是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計。
先了解一下AMQP協(xié)議中間的幾個重要概念:
- Server:接收客戶端的連接,實現(xiàn)AMQP實體服務(wù)。
- Connection:連接,應(yīng)用程序與Server的網(wǎng)絡(luò)連接,TCP連接。
- Channel:信道,消息讀寫等操作在信道中進行。客戶端可以建立多個信道,每個信道代表一個會話任務(wù)。
- Message:消息,應(yīng)用程序和服務(wù)器之間傳送的數(shù)據(jù),消息可以非常簡單,也可以很復(fù)雜。由Properties和Body組成。Properties為外包裝,可以對消息進行修飾,比如消息的優(yōu)先級、延遲等高級特性;Body就是消息體內(nèi)容。
- Virtual Host:虛擬主機,用于邏輯隔離。一個虛擬主機里面可以有若干個Exchange和Queue,同一個虛擬主機里面不能有相同名稱的Exchange或Queue。
- Exchange:交換器,接收消息,按照路由規(guī)則將消息路由到一個或者多個隊列。如果路由不到,或者返回給生產(chǎn)者,或者直接丟棄。RabbitMQ常用的交換器常用類型有direct、topic、fanout、headers四種,后面詳細(xì)介紹。
- Binding:綁定,交換器和消息隊列之間的虛擬連接,綁定中可以包含一個或者多個RoutingKey。
- RoutingKey:路由鍵,生產(chǎn)者將消息發(fā)送給交換器的時候,會發(fā)送一個RoutingKey,用來指定路由規(guī)則,這樣交換器就知道把消息發(fā)送到哪個隊列。路由鍵通常為一個“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息隊列,用來保存消息,供消費者消費。
2.2 工作原理
AMQP 協(xié)議模型由三部分組成:生產(chǎn)者、消費者和服務(wù)端,執(zhí)行流程如下:
- 生產(chǎn)者是連接到 Server,建立一個連接,開啟一個信道。
- 生產(chǎn)者聲明交換器和隊列,設(shè)置相關(guān)屬性,并通過路由鍵將交換器和隊列進行綁定。
- 消費者也需要進行建立連接,開啟信道等操作,便于接收消息。
- 生產(chǎn)者發(fā)送消息,發(fā)送到服務(wù)端中的虛擬主機。
- 虛擬主機中的交換器根據(jù)路由鍵選擇路由規(guī)則,發(fā)送到不同的消息隊列中。
- 訂閱了消息隊列的消費者就可以獲取到消息,進行消費。
2.3 常用交換器
RabbitMQ常用的交換器類型有direct、topic、fanout、headers四種:
- Direct Exchange:見文知意,直連交換機意思是此交換機需要綁定一個隊列,要求該消息與一個特定的路由鍵完全匹配。簡單點說就是一對一的,點對點的發(fā)送。
- Fanout Exchange:這種類型的交換機需要將隊列綁定到交換機上。一個發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息。簡單點說就是發(fā)布訂閱。
- Topic Exchange:直接翻譯的話叫做主題交換機,如果從用法上面翻譯可能叫通配符交換機會更加貼切。這種交換機是使用通配符去匹配,路由到對應(yīng)的隊列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符號。
- *符號:有且只匹配一個詞。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
- #符號:匹配一個或多個詞。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。
- Headers Exchange:這種交換機用的相對沒這么多。它跟上面三種有點區(qū)別,它的路由不是用routingKey進行路由匹配,而是在匹配請求頭中所帶的鍵值進行路由。創(chuàng)建隊列需要設(shè)置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機會根據(jù)生產(chǎn)者發(fā)送過來的頭部信息攜帶的鍵值去匹配隊列綁定的鍵值,路由到對應(yīng)的隊列。
3. RabbitMQ環(huán)境搭建
因為我用的是mac,所以直接可以參考官網(wǎng):
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執(zhí)行:
brew update
然后再執(zhí)行:
brew install rabbitmq
之前沒有執(zhí)行brew update,直接執(zhí)行brew install rabbitmq時,會報各種各樣奇怪的錯誤,其中“403 Forbidde”居多。
但是在執(zhí)行“brew install rabbitmq”,會自動安裝其它的程序,如果你使用源碼安裝Rabbitmq,因為啟動該服務(wù)依賴erlang環(huán)境,所以你還需手動安裝erlang,但是目前官方已經(jīng)一鍵給你搞定,會自動安裝Rabbitmq依賴的所有程序,是不是很棒!
最后執(zhí)行成功的輸出如下:
啟動服務(wù):
# 啟動方式1:后臺啟動
brew services start rabbitmq
# 啟動方式2:當(dāng)前窗口啟動
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在瀏覽器輸入:
http://localhost:15672/
會出現(xiàn)RabbitMQ后臺管理界面(用戶名和密碼都為guest):
通過brew安裝,一行命令搞定,真香!
4. RabbitMQ 集成
4.1 前置工作
添加賬號:
## 添加賬號
./rabbitmqctl add_user admin admin
## 添加訪問權(quán)限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設(shè)置超級權(quán)限
./rabbitmqctl set_user_tags admin administrator
pom 引入依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
4.2 代碼實現(xiàn)
核心代碼
先整一個 ConnectionFactory 單例,每臺機器都有自己的 ConnectionFactory,防止每次都初始化(在后面的迭代中,我會把這個去掉,整成連接池)。
/**
* @author Louzai
* @date 2023/5/10
*/
public class RabbitmqUtil {
/**
* 每個key都有自己的工廠
*/
private static Map<String, ConnectionFactory> executors = new ConcurrentHashMap<>();
/**
* 初始化一個工廠
*
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory init(String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(passport);
factory.setVirtualHost(virtualhost);
return factory;
}
/**
* 工廠單例,每個key都有屬于自己的工廠
*
* @param key
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory getOrInitConnectionFactory(String key,
String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory connectionFactory = executors.get(key);
if (null == connectionFactory) {
synchronized (RabbitmqUtil.class) {
connectionFactory = executors.get(key);
if (null == connectionFactory) {
connectionFactory = init(host, port, username, passport, virtualhost);
executors.put(key, connectionFactory);
}
}
}
return connectionFactory;
}
}
獲取 RabbitmqClient:
/**
* @author Louzai
* @date 2023/5/10
*/
@Component
public class RabbitmqClient {
@Autowired
private RabbitmqProperties rabbitmqProperties;
/**
* 創(chuàng)建一個工廠
* @param key
* @return
*/
public ConnectionFactory getConnectionFactory(String key) {
String host = rabbitmqProperties.getHost();
Integer port = rabbitmqProperties.getPort();
String userName = rabbitmqProperties.getUsername();
String password = rabbitmqProperties.getPassport();
String virtualhost = rabbitmqProperties.getVirtualhost();
return RabbitmqUtil.getOrInitConnectionFactory(key, host, port, userName,password, virtualhost);
}
}
重點!敲黑板!!!這里就是 RabbmitMQ 的核心邏輯了。
我們使用的交換機類型是 Direct Exchange,此交換機需要綁定一個隊列,要求該消息與一個特定的路由鍵完全匹配,簡單點說就是一對一的,點對點的發(fā)送。
至于為什么不用廣播和主題交換機模式,因為技術(shù)派的使用場景就是發(fā)送單個消息,點到點發(fā)送和消費的模式完全可以滿足我們的需求。
下面 3 個方法都很簡單:
- 發(fā)送消息:拿到工廠 -> 創(chuàng)建鏈接 -> 創(chuàng)建通道 -> 聲明交換機 -> 發(fā)送消息 -> 關(guān)閉鏈接;
- 消費消息:拿到工廠 -> 創(chuàng)建鏈接 -> 創(chuàng)建通道 -> 確定消息隊列 -> 綁定隊列到交換機 -> 接受并消費消息;
- 消費消息永動模式:非阻塞模式消費 RabbitMQ 消息。
@Component
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitmqClient rabbitmqClient;
@Autowired
private NotifyService notifyService;
@Override
public void publishMsg(String exchange,
BuiltinExchangeType exchangeType,
String toutingKey,
String message) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(toutingKey);
// TODO: 這種并發(fā)量起不來,需要改造成連接池
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發(fā)布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Publish msg:" + message);
channel.close();
connection.close();
}
@Override
public void consumerMsg(String exchange,
String queue,
String routingKey) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(routingKey);
// TODO: 這種并發(fā)量起不來,需要改造成連接池
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊列到交換機
channel.queueBind(queue, exchange, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer msg:" + message);
// 獲取Rabbitmq消息,并保存到DB
// 說明:這里僅作為示例,如果有多種類型的消息,可以根據(jù)消息判定,簡單的用 if...else 處理,復(fù)雜的用工廠 + 策略模式
notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
}
@Override
public void processConsumerMsg() {
System.out.println("Begin to processConsumerMsg.");
Integer stepTotal = 1;
Integer step = 0;
// TODO: 這種方式非常 Low,后續(xù)會改造成阻塞 I/O 模式
while (true) {
step ++;
try {
System.out.println("processConsumerMsg cycle.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE,
CommonConstants.QUERE_KEY_PRAISE);
if (step.equals(stepTotal)) {
Thread.sleep(10000);
step = 0;
}
} catch (Exception e) {
}
}
}
}
這里只是給個示例,如果要真正用到生產(chǎn)環(huán)境,你覺得有哪些問題呢? 你自己先想想,文末再告訴你。
調(diào)用入口
其實之前我們是通過 JAVA 的內(nèi)置異步調(diào)用方式,為了方便驗證,我把文章點贊的功能遷移到 RabbitMQ 中,只要是點贊,就走 RabbitMQ 模式。
// 點贊消息走 RabbitMQ,其它走 Java 內(nèi)置消息機制
if (notifyType.equals(NotifyTypeEnum.PRAISE) && rabbitmqProperties.getSwitchFlag()) {
rabbitmqService.publishMsg(
CommonConstants.EXCHANGE_NAME_DIRECT,
BuiltinExchangeType.DIRECT,
CommonConstants.QUERE_KEY_PRAISE,
JsonUtil.toStr(foot));
} else {
Optional.ofNullable(notifyType).ifPresent(notify -> SpringUtil.publishEvent(new NotifyMsgEvent<>(this, notify, foot)));
}
那消費入口放哪里呢?其實是在程序啟動的時候,我們就啟動 RabbitMQ 進行消費,然后整個進程一直在程序中跑。
@Override
public void run(ApplicationArguments args) {
// 設(shè)置類型轉(zhuǎn)換, 主要用于MyBatis讀取varchar/json類型數(shù)據(jù)據(jù),并寫入到j(luò)son格式的實體Entity中
JacksonTypeHandler.setObjectMapper(new ObjectMapper());
// 應(yīng)用啟動之后執(zhí)行
GlobalViewConfig config = SpringUtil.getBean(GlobalViewConfig.class);
if (webPort != null) {
config.setHost("http://127.0.0.1:" + webPort);
}
// 啟動 RabbitMQ 進行消費
if (rabbitmqProperties.getSwitchFlag()) {
taskExecutor.execute(() -> rabbitmqService.processConsumerMsg());
}
log.info("啟動成功,點擊進入首頁: {}", config.getHost());
}
4.3 演示一下
我們多次點擊“點贊”按鈕,觸發(fā) RammitMQ 消息發(fā)送。
可以通過日志,也可以看到發(fā)送和消費過的消息。
我靠!好多沒有關(guān)閉的鏈接。。。
還有一堆沒有關(guān)閉的 channel。。。
估計再多跑一會,內(nèi)存全部吃光,機器就死機了,怎么破?答案是連接池!
4.4 代碼分支
為了方便大家學(xué)習(xí)功能演變的過程,每個模塊都會單獨開個分支,包括后面的升級版:
- 代碼倉庫:https://Github.com/itwanger/paicoding
- 代碼分支:feature/add_rabbitmq_20230506
如果需要運行 RabbitMQ,下面的配置需要改成 true,因為代碼默認(rèn)是 false。
5 后記
這篇文章,讓大家知道 RabbitMQ 的基本原理,以及如何去集成 RabbitMQ,但是還不能用到實際生產(chǎn)環(huán)境,但是這個確實是我寫的第一個版本,存粹是搞著玩的,因為里面存在的問題還非常多。
我簡單列舉一下:
- 需要給 Connection 加個連接池,否則內(nèi)存會持續(xù)消耗,機器肯定扛不住;
- 需要對 RabbitMQ 的消費方式進行改造,因為 while + sleep 的方式過于簡單粗暴;
- 假如消費的任務(wù)掛掉了,你需要有重啟 RabbitMQ 的消費機制;
- 假如機器掛了,重啟后,RabbitMQ 內(nèi)部的消息不能丟失。
如果你對上面的問題也非常感興趣,可以直接基于分支 feature/add_rabbitmq_20230506,然后給我提 PR,技術(shù)嘛,我喜歡邊玩邊學(xué)。