一、RabbitMQ 基礎理解
RabbitMQ,是一個使用 erlang 編寫的 AMQP(高級消息隊列協議)的服務實現,簡單來說,就是一個功能強大的消息隊列服務。
概念理解:
- Producer: 消息發送者
- RabbitMQ:
- Vhost: 相當于分組,每個vhost下數據是隔離的
- Exchange: 路由器,接收消息,本根據RoutingKey分發消息
- headers:消息頭類型 路由器,內部應用
- direct:精準匹配類型 路由器
- topic:主題匹配類型 路由器,支持正則 模糊匹配
- fanout:廣播類型 路由器,RoutingKey無效
- RoutingKey: 路由規則
- Queue: 隊列,用于存儲消息(消息的目的地)
- Consumer: 消息消費者
持久化:
一個好的消息隊列當然需要消息持久化功能,服務宕機,未消費消息不丟失,RabbitMQ持久化分為Exchange、Queue、Message
Exchange 和 Queue 持久化 指持久化Exchange、Queue 元數據,持久化的是自身,服務宕機,Exchange 和 Queue 自身就沒有了
Message 持久化 顧名思義 把每一條消息體持久化,服務宕機,消息不丟失
Durable 持久、Transient 臨時,Queue新建類似
分析理解:
便于更直觀的理解,把 RabbitMQ 的消息流對比與Http Rest接口更家熟悉形象
www.xxx.com/webAppPath/trade/getOrder -> getOrder(message) GET
RabbitMQ Server:同比 域名 www.xxx.com,只有通過域名才能到達 Server
Vhost:同比 /webappPath,一個域名可能指向多個app
Exchange:同比 /trade,trade/* 下有多個method,但是需要先到達這個Class
RoutingKey:同比 /getOrder,只有完成的 URL 才是有效的,才能確定到具體的方法 Queue:同比 getOrder(message) 消息的最終目的地
Exchange Type: 同比 GET,但是Rest MethodType是整個URL的Type,而不是 Queue
以上只是為了更好理解,千萬不要混淆
Producer / Consumer 就很好理解了,基于AMQP協議鏈接RabbitMQ Server,發送消息 / 接收消息
二、RabbitMQ 消息確認策略分析
Confrim / Transaction 概念應用
RabbitMQ 提供了兩種可靠性的確認策略 Confrim / Transaction,Producer Client僅分析Spring-Amqp,兩種機制主要影響發送:
Confrim: 簡單說就是直接傳送消息 client > mq, 接收到 mq > ack, mq 在異步的完成 接下來的事情
Transaction: client 請求開啟事務 > 發送message > client 提交事務,整個過程是同步的,mq必須完成消息持久化、消息同步等。
spring-amqp 提供的發送客戶端 默認是Confrim 異步Ack模式,不用特殊配置,Transaction 需要在默認的基礎上增加 RabbitMQ事務管理器
// 1.向Spring中注冊RabbitMQ事務管理器
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
...
// 2.設置通道為Transaction類型
rabbitTemplate.setChannelTransacted(true);
...
// 3.對應的方法添加@Transactional
@Transactional
public void send(String exchange, String routingKey, Object object) {
rabbitTemplate.convertAndSend(exchange, routingKey, object);
}
// 這是只是舉例,具體寫法和其他說明,請具體看代碼注釋有更詳細的說明和寫法component.RabbitSender
Confrim / Transaction 測試分析
Consumer
消費的機制和發送差不多, 但流程變為 Consumer 處理消息,需要Ack MQ Server, Server 才會真正的刪除消息,通常消費者不需要開啟事務,當處理異常拋出,Ack無法發到Server到,消息就會回到隊列中,繼續重試,阻塞到直到消息被消費Ack掉,所說的消息阻塞
具體寫法和其他說明,請具體看代碼注釋有更詳細的說明和寫法component.Receiver
三、RabbitMQ 配置
RabbitMQ 安裝
RabbitMQ是基于Erlang運行的,首先選擇RabbitMQ版本,確定需要的Erlang版本,然后安裝Erlang,自行百度、谷歌、RabbitMQ官網或者Erlang官網都會有相應的資源、教程(ps: Erlang 版本請嚴格按照所選RabbitMQ版本要求的Erlang范圍安裝,否則會有各種不治之癥)
本文以 Erlang20.03,RabbitMQ 3.7為例,RabbitMQ為linux 通用包,不同安裝方式版本配置文件路徑有差異,通用包好處,可移植性、控制性好
包目錄結構:
./sbin/ rabbitmq 啟動rabbitmq-server、插件rabbitmq-plugins、功能rabbitmqctl等腳本位置
./etc/rabbitmq/ rabbitmq 啟動配置,包括隨啟動插件配置、環境配置、應用配置
RabbitMQ 配置文件
- rabbitmq-env.conf 環境配置 key = val 形式
# 指定節點的名字 默認 rabbit@${hostname},如指定了節點名,需配置 host ip cluster1
NODENAME=rabbit@cluster1
# 指定端口 默認 5672
NODE_PORT=5672
# 配置持久目錄
MNESIA_BASE=/mnt/data1/rabbitmq/store
# 配置日志目錄 默認文件名字:${NODENAME}.log 可以用配置修改
LOG_BASE=/mnt/data1/rabbitmq/logs
- rabbitmq.conf 環境配置 key = val 形式
主要配置日志、默認用戶信息、持久化相關等,沒有定制化通常不用修改
# console log config 主要測試排查問題
log.console = false
log.console.level= debug
# log config
log.file= rabbit.log
log.file.level= info
log.file.rotation.date= $D0
# web port
management.listener.port= 15672
- enabled_plugins 配置相應插件的名字 server start plugin也會啟動
[rabbitmq_management].
具體請看官方配置說明, 詳細的講解了rabbitmq-env.conf 和 rabbitmq.conf 配置 官方配置說明
RabbitMQ 常用命令:
# 后臺啟動本地服務
./rabbitmq-server –detached
# 開啟/關閉 服務
./rabbitmqctl start_app {-n node_name}
./rabbitmqctl stop_app {-n node_name}
# 開啟/關閉某個插件 (重啟服務器后生效)
./rabbitmq-plugins enable xxx
./rabbitmq-plugins disable xxx
# 更改節點類型
./rabbitmqctl change_cluster_node_type {disc/ram} {-n node_name}
# 配置用戶
./rabbitmqctl add_user username password
./rabbitmqctl change_password username newpassword
./rabbitmqctl delete_user username
./rabbitmqctl set_user_tags username administrator
Tag: none、management、policymaker、monitoring、administrator
./rabbitmqctl set_permissions -p /vhost1 username 'conf' 'write' 'read'
conf 一個正則表達式match哪些配置資源能夠被該用戶配置
write 一個正則表達式match哪些配置資源能夠被該用戶寫入
read 一個正則表達式match哪些配置資源能夠被該用戶讀取
四、高可用的集群搭建
基礎概念
RabbitMQ 集群分為兩種 普通集群 和 鏡像集群,可以說 鏡像集群 是 普通集群 的晉升版
普通集群:
以兩個節點(rabbit01、rabbit02)為例來進行說明。
rabbit01和rabbit02兩個節點僅有相同的元數據,即隊列的結構,但消息實體只存在于其中一個節點rabbit01(或者rabbit02)中。
當消息進入rabbit01節點的Queue后,consumer從rabbit02節點消費時,RabbitMQ會臨時在rabbit01、rabbit02間進行消息傳輸,把A中的消息實體取出并經過B發送給consumer。所以consumer應盡量連接每一個節點,從中取消息。即對于同一個邏輯隊列,要在多個節點建立物理Queue。否則無論consumer連rabbit01或rabbit02,出口總在rabbit01,會產生瓶頸。當rabbit01節點故障后,rabbit02節點無法取到rabbit01節點中還未消費的消息實體。如果做了消息持久化,那么得等rabbit01節點恢復,然后才可被消費;如果沒有持久化的話,就會產生消息丟失的現象。
鏡像集群:
在普通集群的基礎上,把需要的隊列做成鏡像隊列,消息實體會主動在鏡像節點間同步,而不是在客戶端取數據時臨時拉取,也就是說多少節點消息就會備份多少份。該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,集群內部的網絡帶寬將會被這種同步通訊大大消耗掉。所以在對可靠性要求較高的場合中適用
由于鏡像隊列之間消息自動同步,且內部有選舉master機制,即使master節點宕機也不會影響整個集群的使用,達到去中心化的目的,從而有效的防止消息丟失及服務不可用等問題
集群搭建
RabbitMQ 集群通信的驗證機制是通過 erlang.cookie進行確認的,只有erlang.cookie一致的兩個服務才能通信,創建cookie文件:
mkdir ~/.erlang.cookie
echo 'SJJARLYPVRPMWFVGKWZZ' > ~/.erlang.cookie
chmod 400 ~/.erlang.cookie
cluster1=10.0.0.1, cluster2=10.0.0.2 2臺服務器為例,本地搭建需修改 tcp端口、web端口
寫入hostes:
10.0.0.1 cluster1
10.0.0.2 cluster2
修改rabbitmq-env.conf:
# server1
NODENAME=rabbit@cluster1
...
# server2
NODENAME=rabbit@cluster2
啟動server1:
./rabbitmq-server –detached
將server2加入到server形成集群:
./rabbitmqctl -n rabbit@cluster2 stop_app
# 重置元數據、集群配置等信息
./rabbitmqctl -n rabbit@cluster2 reset
# cluster2 加入到 cluster1 的集群中 --ram表示cluster2為RAM節點 默認為disc
./rabbitmqctl -n rabbit@cluster2 join_cluster rabbit@cluster1 --ram
./rabbitmqctl -n rabbit@cluster2 start_app
普通集群就搭建完成了,普通集群并不是高可用的,基于普通集群升級為鏡像集群RabbitMQ HA方案
./rabbitmqctl set_policy <name> [-p <vhost>] <pattern> <definition> [--apply-to <apply-to>]
name: 策略名稱
vhost: 指定vhost, 默認值 /
pattern: 需要鏡像的正則
definition:
ha-mode: 指明鏡像隊列的模式,有效值為 all/exactly/nodes
all 表示在集群所有的節點上進行鏡像,無需設置ha-params
exactly 表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定
nodes 表示在指定的節點上進行鏡像,節點名稱通過ha-params指定
ha-params: ha-mode 模式需要用到的參數
ha-sync-mode: 鏡像隊列中消息的同步方式,有效值為automatic,manually
apply-to: 可選值3個,默認all
exchanges 表示鏡像 exchange (并不知道意義所在)
queues 表示鏡像 queue
all 表示鏡像 exchange和queue
eg:
./rabbitmqctl set_policy test "test" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
測試: exchange = test, queue = test
case1: pattern=test, apply-to=exchanges -> 結果 exchange被鏡像
case2: pattern=test, apply-to=queues -> 結果 queue被鏡像
case3: pattern=test, apply-to=all -> 結果 queue被鏡像
結論: 不知道exchange被鏡像的意義所在,鏡像queue才是關鍵
ps:
保證集群的高可用,至少要有1個disc節點
RabbitMQ Cluster 全部掛掉,RAM節點無法先啟動,必須先啟動disc節點
推薦 2 RAM 1 DISC 集群搭建方式
總結:
RabbitMQ高可用集群還是非常有必要的,高可用的代價就是性能的降低,對可靠性要求比較高的企業務還是值得的,據我測試2R1D鏡像集群(非壓測, 壓測結果絕對更高),達到1000QPS+還是沒問題的,如果開啟事務,保證同步發送應答,也可達500QPS+,絕對滿足大多數可靠性要求高的業務。