一、關于 RabbitMQ
說到 RabbitMQ,相信大家都不會陌生,微服務開發中必不可少的中間件。
在上篇關于消息隊列的文章中,我們了解到 RabbitMQ 本質其實是用 Erlang 開發的 AMQP(Advanced Message Queuing Protocol )的具體實現,最初起源于金融系統,主要用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面有著不俗的表現。
2010年4月,RabbitMQ 科技公司被 VMware 旗下的 SpringSource 收購,在 2013 年 5 月被并入 Pivotal 。
其實 VMware,Pivotal 本質上是一家的。不同的是,VMware 是獨立上市子公司,而 Pivotal 是整合了EMC的某些資源,現在并沒有上市。其中我們現在使用的 Spring 系列框架,就是 Pivotal 公司熱門的產品之一。
直到后來 Pivotal 將其開源,RabbitMQ 才逐漸走向大眾!
RabbitMQ 發展到今天,已經被越來越多的人認可,尤其是互聯網公司,已經有著大規模的場景應用,今天我們就一起來深入了解一下 RabbitMQ。
二、RabbitMQ 模型介紹
2.1、內部結構分析
上面我們有說到 RabbitMQ 本質是 AMQP 協議的一個開源實現,在詳細介紹 RabbitMQ 之前,我們先來看一下 AMQP 的內部結構圖!
基本概念如下:
- Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序
- Exchange:交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列
- Binding:綁定,用于將消息隊列和交換器之間建立關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將它理解成一個由綁定構成的路由表。
- Queue:消息隊列,用來保存消息直到發送給消費者
- Connection:網絡連接,比如一個 TCP 連接
- Channel:信道,多路復用連接中的一條獨立的雙向數據流通道
- Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序
- Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 /
- Broker:表示消息隊列服務器實體
- Message:消息實體,它由消息頭和消息體組成。消息頭主要由路由鍵、交換器、隊列、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等屬性組成,而消息體就是指具體的業務對象
相比傳統的 JMS 模型,AMQP 主要多了 Exchange、Binding 這個新概念。
在 AMQP 模型中,消息的生產者不是直接將消息發送到Queue隊列,而是將消息發送到Exchange交換器,其中還新加了一個中間層Binding綁定,作用就是通過路由鍵Key將交換器和隊列建立綁定關系。
就好比類似用戶表和角色表,中間通過用戶角色表來將用戶和角色建立關系,從而實現關系綁定,在 RabbitMQ 中,消息生產者不直接跟隊列建立關系,而是將消息發送到交換器之后,由交換器通過已經建立好的綁定關系,將消息發送到對應的隊列!
RabbitMQ 最終的架構模型,核心部分就變成如下圖所示:
從圖中很容易看出,與 JMS 模型最明顯的差別就是消息的生產者不直接將消息發送給隊列,而是由Binding綁定決定交換器的消息應該發送到哪個隊列,進一步實現了在消息的推送方面,更加靈活!
2.2、交換器分發策略
當消息的生產者將消息發送到交換器之后,是不會存儲消息的,而是通過中間層綁定關系將消息分發到不同的隊列上,其中交換器的分發策略分為四種:Direct、Topic、Headers、Fanout!
- Direct:直連類型,即在綁定時設定一個 routing_key, 消息的 routing_key 匹配時, 才會被交換器投送到綁定的隊列中去,原則是先匹配、后投送;
- Topic:按照規則轉發類型,支持通配符匹配,和 Direct 功能一樣,但是在匹配 routing_key的時候,更加靈活,支持通配符匹配,原則也是先匹配、后投送;
- Headers:頭部信息匹配轉發類型,根據消息頭部中的 header attribute 參數類型,將消息轉發到對應的隊列,原則也是先匹配、后投送;
- Fanout:廣播類型,將消息轉發到所有與該交互機綁定的隊列上,不關心 routing_key;
2.2.1、Direct
Direct 是 RabbitMQ 默認的交換機模式,也是最簡單的模式,消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。
如果傳入的 routing key 為 black,不會轉發到black.green。Direct 類型交換器是完全匹配、單播的模式。
2.2.2、Topic
Topic 類型交換器轉發消息和 Direct 一樣,不同的是:它支持通配符轉發,相比 Direct 類型更加靈活!
兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個。
如果傳入的 routing key 為 black#,不僅會轉發到black,也會轉發到black.green。
2.2.3、Headers
headers 也是根據規則匹配, 相比 direct 和 topic 固定地使用 routing_key , headers 則是通過一個自定義匹配規則的消息頭部類進行匹配。
在隊列與交換器綁定時,會設定一組鍵值對規則,消息中也包括一組鍵值對( headers 屬性),當這些鍵值對有一對, 或全部匹配時,消息被投送到對應隊列。
此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。
2.2.4、Fanout
Fanout 類型交換器與上面幾個不同,不管路由鍵或者是路由模式,會把消息發給綁定給它的全部隊列,如果配置了 routing_key 會被忽略,也被成為消息廣播模式。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息
fanout 類型轉發消息在四種類型中是最快的。
三、RabbitMQ 安裝
RabbitMQ 基于 erlang 進行通信,相比其它的軟件,安裝有些麻煩,為了跟生產環境保持一直,操作系統選擇centos7,不過本例采用rpm方式安裝,任何新手都可以完成安裝,過程如下!
3.1、安裝前命令準備
輸入如下命令,完成安裝前的環境準備。
yum install lsof build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim
3.2、下載 RabbitMQ、erlang、socat 的安裝包
本次下載的是RabbitMQ-3.6.5版本,采用rpm一鍵安裝,適合新手直接上手。
先創建一個rabbitmq目錄,本例的目錄路徑為/usr/App/rabbitmq,然后在目錄下執行如下命令,下載安裝包!
- 下載erlang
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
- 下載socat
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
- 下載rabbitMQ
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
最終目錄文件如下:
3.3、安裝軟件包
下載完之后,按順序依次安裝軟件包,這個很重要哦~
- 安裝erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
- 安裝socat
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
- 安裝rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安裝完成之后,修改rabbitmq的配置,默認配置文件在
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
修改loopback_users節點的值!
最后只需通過如下命令,啟動服務即可!
rabbitmq-server start &
運行腳本之后,如果報錯,例如下圖!
解決辦法如下:
vim /etc/rabbitmq/rabbitmq-env.conf
在文件里添加一行,如下配置!
NODENAME=rabbit@localhost
然后,再保存!再次以下命令啟動服務!
rabbitmq-server start &
通過如下命令,查詢服務是否啟動成功!
lsof -i:5672
如果出現5672已經被監聽,說明已經啟動成功!
3.4、啟動可視化的管控臺
輸入如下命令,啟動控制臺!
rabbitmq-plugins enable rabbitmq_management
用瀏覽器打開http://ip:15672,這里的ip就是 CentOS 系統的 ip,結果如下:
賬號、密碼,默認為guest,如果出現無法訪問,檢測防火墻是否開啟,如果開啟將其關閉即可!
登錄之后的監控平臺,界面如下:
四、web界面使用
相比其他的消息隊列,rabbitMQ 其中一個很明顯的好處就是有 web 操作界面,而且簡單易用。
進入 web 管理界面之后,可以很清晰的看到分了 6 個菜單目錄,分別是:Overview、Connections、Channels、Exchanges、Queues、Admin。
- Overview:總覽概述,主要介紹 rabbitmq 一些基礎匯總等信息
- Connections:連接池管理,主要介紹客戶端連接等信息
- Channels:信道管理,主要介紹信道連接等信息
點擊具體某個具體的信道,可以看到對應的消費隊列等信息。
- Exchanges:交換器管理,主要介紹交換器等信息
- Queues:隊列管理,主要介紹隊列等信息
- Admin:系統管理,主要介紹用戶、虛擬主機、權限等信息
下面,我們重點介紹一些如何通過 web 頁面來操作 rabbitMQ!
4.1、交換器管理
點擊進入 Exchanges 菜單,最下面有一個Add a new exchange標簽。
點擊Add a new exchange,會展示如下信息!
- Name:交換器名稱
- Type:交換器類型
- Durability:是否持久化,Durable:持久化,Transient:不持久化
- Auto delete:是否自動刪除,當最后一個綁定(隊列或者exchange)被unbind之后,該exchange 自動被刪除
- Internal:是否是內部專用exchange,是的話,就意味著我們不能往該exchange里面發消息
- Arguments:參數,是AMQP協議留給AMQP實現做擴展使用的
我們先新建一個名稱為hello-exchange,類型為direct的交換器,結果如下。
等會用于跟隊列關聯!
4.2、隊列管理
點擊進入 Queues 菜單,最下面也有一個Add a new queue標簽。
點擊標簽,即可進入添加隊列操作界面!
- Name:隊列名稱
- Durability:是否持久化,Durable:持久化,Transient:不持久化
- Auto delete:是否自動刪除,是的話,當隊列內容為空時,會自動刪除隊列
- Arguments:參數,是AMQP協議留給AMQP實現做擴展使用的
同樣的,新建一個名稱為hello-mq的消息隊列,結果如下。
隊列新建好了之后,繼續來建立綁定關系!
4.3、綁定管理
建立綁定關系,既可以從隊列進入也可以從交換器進入。
如果是從交換器進入,那么被關聯的對象就是隊列。
如果是從隊列進入,那么被關聯的對象就是交換器。
我們選擇從隊列入手,被綁定的交換器是hello-exchange,因為類型是direct,所以還需要填寫routing key。
建立完成之后,在交換器那邊也可以看到對應的綁定關系。
4.4、發送消息
最后,我們從交換器入手,選擇對應的交換器,點擊Publish message標簽,填寫對應的路由鍵 key,發送一下數據,查看數據是否發送到對應的隊列中。
然后點擊進入 Queues 菜單,查詢消息隊列基本情況。
然后選擇hello-mq消息隊列,點擊Get messages標簽,獲取隊列中的消息。
結果如下,可以很清晰的看到,消息寫入到隊列!
五、JAVA客戶端使用
RabbitMQ 支持多種語言訪問,本次介紹 RabbitMQ Java Client 的一些簡單的api使用,如聲明 Exchange、Queue,發送消息,消費消息,一些高級 api 會在后面的文章中詳細的說明。
5.1、引入 rabbitMQ 依賴包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
5.2、連接服務器
使用給定的參數(host name,端口等等)連接AMQP的服務器。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
也可以使用通過 URI 方式進行連接。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection(連接)接口可以被用作創建一個channel(管道),利用 channel(管道)可以進行發送和接收消息,在后面我們會頻繁使用到它。
Channel channel = conn.createChannel();
注意,管道使用之后,需要進行關閉。
channel.close();
conn.close();
5.3、創建交換器
不僅可以通過 web頁面進行創建交換器,還可以通過代碼進行聲明(創建的意思)交換器。
//創建exchange,類型是direct類型
channel.exchangeDeclare("ex-hello","direct");
//第三個參數表示是否持久化,同步操作,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("ex-hello","direct",true);
System.out.println(ok);
//創建帶屬性的交換器
Map<String,Object> argument = new HashMap<>();
argument.put("alternate-exchange","log");
channel.exchangeDeclare("ex-hello","direct",true,false,argument);
//異步創建exchange,沒有返回值
channel.exchangeDeclareNoWait("ex-hello","direct",true,false,false,argument);
///判斷exchange是否存在,存在的返回ok,不存在的exchange則報錯
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("ex-hello");
System.out.println(declareOk);
//刪除exchange(可重復執行),刪除一個不存在的也不會報錯
channel.exchangeDelete("ex-hello");
創建交換器參數解讀:
- 第一個參數:表示交換器名稱
- 第二個參數:表示交換器類型
- 第三個參數:表示是否持久化,為true表示會將隊列持久化存儲到硬盤
- 第四個參數:表示是否自動刪除,當最后一個綁定(隊列或者exchange)被unbind之后,該exchange 自動被刪除
- 第五個參數:表示設置參數,參數類型為Map<String, Object>
5.4、創建隊列
同樣的,也可以通過代碼進行聲明隊列。
//同步創建隊列
channel.queueDeclare(queueName, true, false, false, null);
//異步創建隊列沒有返回值
channel.queueDeclareNoWait(queueName,true,false,false,null);
//判斷queue是否存在,不存在會拋出異常
channel.exchangeDeclarePassive(queueName);
//刪除隊列
channel.queueDelete(queueName);
創建隊列參數解讀:
- 第一個參數:表示隊列名稱
- 第二個參數:表示是否持久化,為true表示會將隊列持久化存儲到硬盤
- 第三個參數:表示是否排它性,為true表示只對首次聲明它的連接可見,會在其連接斷開的時候自動刪除
- 第四個參數:表示是否自動刪除,為true表示有過消費者并且所有消費者都解除訂閱了,自動刪除隊列
- 第五個參數:表示設置參數,參數類型為Map<String, Object>
5.5、創建綁定
當交換器和隊列都創建成功之后,就可以建立綁定關系。
//交換器和隊列進行綁定(可重復執行,不會重復創建)
channel.queueBind(queueName, exchangeName, routingKey);
//異步進行綁定,最后一個參數表示可以帶自定義參數
channel.queueBindNoWait(queueName,exchangeName,routingKey,null);
//exchange和queue進行解綁(可重復執行)
channel.queueUnbind(queueName, exchangeName, routingKey);
//exchange與exchange進行綁定(可重復執行,不會重復創建)
//第一個參數表示目標交換器
//第二個參數表示原地址交換器
//第三個參數表綁定路由key
channel.exchangeBind(destination,source,routingKey);
//exchange和exchange進行解綁(可重復執行)
channel.exchangeUnbind(destination,source,routingKey);
綁定關系參數解讀:
- queueName:隊列名稱,取自創建的隊列名稱
- exchangeName:交換器,取自創建的交換器名稱
- routingKey:路由鍵key,自定義
5.6、發送消息
發送消息到交換器就會使用我們上文所提到的channel管道。
//發送的消息內容
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
也可以在發送消息前設定一些消息屬性。
//自己構建BasicProperties的對象
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("zhangsan")
.build()),
messageBodyBytes);
發送指定頭信息的消息。
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("userName", '"zhangsan');
headers.put("userCode", "123");
//發送消息到交換器
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
發送一個有過期時間的消息,單位:ms。
//設置消息過期時間,單位ms
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("6000")
.build()),
messageBodyBytes);
更多用法,可以參見官方 API
5.7、接受消息
從消息隊列中接受消息也會使用我們上文所提到的channel管道。
//監聽隊列中的消息
channel.basicConsume(queueName,true,new SimpleConsumer(channel));
監聽隊列消息參數解讀:
- 第一個參數:表示需要監聽的隊列名稱
- 第二個參數:表示是否自動確認,如果配置false表示手動確認消息是否收到
- 第三個參數:表示消息處理類
具體的消息處理類需要繼承DefaultConsumer,并重寫handleDelivery方法,代碼如下:
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//接受從隊列中發送的消息
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內容為:"+new String(body));
}
}
如果是手工確認消息,需要在handleDelivery方法中進行相關的確認,代碼如下:
//手動確認
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
5.8、完整demo
5.8.1、發送消息
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
//連接RabbitMQ服務器
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("197.168.24.206");
factory.setPort(5672);
//創建一個連接
Connection conn = factory.newConnection();
//獲得信道
Channel channel = conn.createChannel();
//聲明交換器
channel.exchangeDeclare("ex-hello","direct");
//發送的消息內容
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish("ex-hello", "route-hello", null, messageBodyBytes);
//關閉通道
channel.close();
conn.close();
}
}
5.8.2、接受消息
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//連接RabbitMQ服務器
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("197.168.24.206");
factory.setPort(5672);
//創建一個連接
Connection conn = factory.newConnection();
//獲得信道
Channel channel = conn.createChannel();
//聲明隊列
channel.queueDeclare("queue-hello", true, false, false, null);
//聲明綁定
channel.queueBind("queue-hello", "ex-hello", "route-hello");
//監聽隊列中的消息
channel.basicConsume("queue-hello",true,new SimpleConsumer(channel));
TimeUnit.SECONDS.sleep(10);
channel.close();
conn.close();
}
}
消息處理類SimpleConsumer
public class SimpleConsumer extends DefaultConsumer {
public SimpleConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//接受從隊列中發送的消息
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內容為:"+new String(body));
}
}
消息發送成功之后,啟動消費者,輸出結果如下:
六、總結
整篇文章主要介紹了 RabbitMQ 內部結構、安裝步驟、使用教程,以及 java 客戶端使用等內容,內容比較長,限于筆者的才疏學淺,對本文內容可能還有理解不到位的地方,如有闡述不合理之處還望留言一起探討。