消息總線的定義
前面在1.4.2節中強調過,在微服務架構中,經常會使用REST 服務或基于消息的通信機制。
在3.6節中也詳細介紹了消息通信的實現方式。消息總線就是一種基于消息的通信機制。
消息總線是一種通信工具,可以在機器之間互相傳輸消息、文件等,它扮演著—種消息路由的角色,擁有一套完備的路由機制來決定消息傳輸方向。發送端只需要向消息總線發出消息,而不用管消息被如何轉發。
Spring Cloud Bus通過輕量消息代理連接各個分布的節點。管理和傳播所有分布式項目中的消息,本質是利用了MQ的廣播機制在分布式的系統中傳播消息,目前常用的有Kafka和RabbitMQ等。
消息總線常見的設計模式
在消息總線中,常見的設計模式有點對點模式及訂閱/發布模式。
1.點對點(P2P)
點對點模式包含三個角色。
- 消息隊列( Queue )。
- 生產者( Producer ) 。
- 消費者(Consumer )。
點對點模式中的每個消息都被發送到一個特定的隊列,消費者從隊列中獲取消息。隊列保留著消息,直到它們被消費或超時。圖16-1展示了點對點模式的運行流程圖。
點對點模式具有以下特點。
- 每個消息只有一個消費者,即消息一旦被消費,就不在消息隊列中了。
- 生產者和消費者之間在時間上沒有依賴性,也就是說當生產者發送了消息之后,不管消費者有沒有正在運行,都不會影響到消息被發送到隊列。
- 消費者在成功接收消息之后需向隊列應答成功,這樣消息隊列才能知道消息是否被成功消費。
2.訂閱/發布(PublSub )
訂閱/發布模式包含三個角色。
- 主題(Topic )。
- 發布者(Publisher )。
- 訂閱者(Subscriber )。
訂閱/發布模式中,多個發布者將消息發送到對應的主題,系統將這些消息傳遞給多個訂閱者。圖16-2展示了訂閱/發布模式的運行流程圖。
訂閱/發布模式具有以下特點。
- 每個消息可以有多個消費者。
- 主題可以被認為是消息的傳輸中介,發布者發布消息到主題,訂閱者從主題訂閱消息。
- 主題使得消息訂閱者和消息發布者保持互相獨立,不需要接觸即可保證消息的傳送。
消息總線的意義
在微服務架構中,經常會使用REST服務作為服務間的通信機制。REST以其輕量、簡單、易理解而著稱,但這種通信機制也并非適合所有的場景。例如,在一些高并發、高可靠、實時的場景,則需要消息總線來幫忙。
概括起來,消息總線具有以下幾個優點。
1.實時性高
與REST 服務的“請求—響應”模式不同,消息總線的實時性非常高。使用了消息總線,生產者一方只要把消息往隊列里一扔,就可以立馬返回,響應用戶了。無須等待處理結果,實現了異步處理。
同時,對于消費者而言,消費者對于消息的到達感知也非常及時。消費者會對消息總線進行監聽,只要有消息進入隊列,就可以馬上得到通知。這種優勢是REST 服務所不能具備的。在REST服務中,要想及時獲取到更新通知,就不得不進行輪詢。這往往非常低效。
2生產者與消費者解耦
在消息總線中,生產者負責將消息發送到隊列中,而消費者把消息從隊列中取出來。生產者無須等待消費者啟動,消費者也無須關心生產者是否已經處于就緒狀態。所以,這種模式能很好地實現生產者與消費者的解耦。
然而,如果是在REST服務中,服務調用方必須等待服務的提供方準備好了才能調用,否則就會調用失敗。
3.故障率低
消息總線擁有對其他通信方式更高的成功率。一方面,生產者與消費者之間實現了解耦,所以,生產者與消費者之間不存在強關聯關系,即便是生產者或消費者任意一方掉線了,也不會影響消息最終的送達;另一方面,消息總線往往會結合數據庫來實現消息的持久化,并設置狀態標識。只有消息消費成功,才會去修改狀態標識。
消息總線同時還承擔著緩沖區的作用。大量業務消息首先會進入消息隊列進行緩存,消息的消費者可以根據自己的處理能力來進行消費,所以不管消息的數據量有多少,都不會對消費者造成沖擊。
消息總線常見的實現方式
《分布式系統常用技術及案例分析》一書列舉了非常多的流行的、開源的分布式消息服務,如Apache ActiveMQ、RabbitMQ、Apache RocketMQ、Apache Kafka等。這些消息中間件都實現了點對點模式及訂閱/發布模式等常見的消息模式。
以下例子演示的是使用ActiveMQ實現生產者—消費者的JAVA實現方式。
生產者程序Producer.java:
public class Producer{
private static final Logger LOGGER=LoggerFactory.getLogger (Producer.
class);
private static final string BROKER_URE = ActiveMQConnection.DEFAULT_
BROKER URL;private static final String SUBJECT= "waylau-queue";
public static void main (String[] args) throws JMSException f
//初始化連接工廠
ConnectionFactory connectionFactory= new ActiveMQConnection
Factory(BROKER_URL);
//獲得連接
Connection conn = connectionFactory.createConnection();
//啟動連接
conn.start(;
//創建session,第一個參數表示會話是否在事務中執行,第二個參數設定會話的應答模式
Session session = conn.createSession(false, Session.AUTO_
ACKNOWLEDGE);
//創建隊列
Destination dest = session.createQueue(SUBJECT);
//createTopic方法用來創建Topic
//session.createTopic ("TOPIC");
//通過session 可以創建消息的生產者
MessageProducer producer = session.createProducer(dest);
for(int i=0;i<100;i++){
//初始化一個MQ消息
TextMessage message= session.createTextMessage ("Welcome to
waylau.com"+i);
//發送消息
producer. send(message);
LOGGER.info("send message {}",i);
//關閉 MQ 連接
conn.close();
}
}
消費者程序Consumer.java:
public class Consumer implements MessageListener {
private static finalLogger LOGGER = LoggerFactory.getLogger
(Consumer.class);
private static final String BROKER_URL = ActiveMQConnection.DEFAULT
BROKER URL;private static final string SUBJECT = "waylau-queue";
public static void main(String[] args) throws JMSExceptionf
//初始化 ConnectionFactory
ConnectionFactory connectionFactory =new ActiveMOConnection
Factory(BROKER_URL);
//創建Mo連接
Connection conn = connectionFactory.createConnection();
//啟動連接
conn .start(;
//創建會話
Session session= conn.createSession (false,Session.AUTO_
ACKNOWLEDGE);
//通過會話創建目標
Destination dest = session.createQueue(SUBJECT);
//創建 MO 消息的消費者
MessageConsumer consumer = session.createConsumer(dest);
//初始化 MessageListener
consumer me=newConsumer();
//給消費者設定監聽對象
consumer .setMessageListener (me);
@override
public void onMessage(Message message){
TextMessage txtMessage =(TextMessage)message;
try{
LOGGER.info("get message " + txtMessage.getText());
}catch (JMSException e) {
LOGGER.error("error {}",e));
}
}
執行命令來啟動ActiveMQa:
bin/activemg start
生產者執行如下命令:
mvn clean compile exec:java -Dexec.mainClass=com.waylau.activemq.ProducerApp
輸出如下。
20:12:10.807 [ActiveMQ Task-1]INEO org.apache.activemq.transport.
failover.FailoverTransport- Successfully connected to tcp://localhost:61616
20:12:10.928[main] INFOcom.waylau.activemq.Producer- send message 0
20:12:10.963 [main] INPO com.waylau.activemq.Producer- send message 1
20:12:10.992 [main] INFO com.waylau.activemq.Producer - send message 2
20:12:11.019[main] INFO com.waylau.activemq.Producer - send message 3
20:12:11.036[main] INFOcom.waylau.activemq.Producer- send message 4
20:12:11.058 [main] INFO com.waylau.activemq.producer -send message 5
20:12:11.085[main] INFOcom.waylau.activemq.Producer - send message6
20:12:11.113 [main] INFOcom.waylau.activemq.Producer - send message 7
20:12:11.141[main] INFOcom.waylau.activemq.Producer - send message 8
20:12:11.191 [main] INFO com.waylau.activemq.Producer- send message 9
消費者執行如下命令:
mvn clean compile exec:java-Dexec.mainClass=com.waylau.activemq. ConsumerApp
輸出如下。
20:12:05.262[ActiveMQ Task-1] INFO org.apache.activemq.transport.
failover.FailoverTransport- Successfully connected to tcp://localhost:
61616
20:12:10.875 [ActiveMQ Session Task-1] INEOcom.waylau.activemg.Consumer -
get message welcome to waylau.com o
20:12:10.939 [ActiveMQ Session Task-1]INFO com.waylau.activemq.Consumer-
get message welcome to waylau.com 1
20:12:10.965 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
get message Welcome to waylau.com 2
20:12:10.994 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -
get message Welcome to waylau .com 3
20:12:11.020 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-
get message Welcome to waylau.com 4
20:12:11.038 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
get message Welcome to waylau.com 5
20:12:11.059 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -
get message Welcome to waylau.com6
20:12:11.086[ActiveMQ Session Task-1] INEO com.waylau.activemq. Consumer-
get message welcome to waylau.com 7
20:12:11.114[ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
get message Welcome to waylau.com 8
20:12:11.142 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-
get message Welcome to waylau.com 9
上述例子的源碼,可以在 https://github.com/waylau/distributed-systems-technologies-and-cas-es-analysis網址的samples目錄下找到。
Spring Cloud Bus 實現消息總線
Spring Cloud Bus通過輕量消息代理連接各個分布的節點,管理和傳播所有分布式項目中的消息,本質是利用了消息中間件的廣播機制在分布式的系統中傳播消息。
目前Spring Cloud Bus所支持的常用的消息中間件有RabbitMQ和Kafka,使用時,只須添加spring-cloud-starter-bus-amqp或spring-cloud-starter-bus-kafka依賴即可。同時,需要確保相關的消息中間件連接配置正確。
下面是使用RabbitMQ作為Spring Cloud Bus 的application.yml配置情況。
spring:
rabbitmg:
host: mybroker .com
port:5672
username:user
password:secret
其中,spring.rabbitmq.host配置項用于指定RabbitMQ的主機位置。
Spring Cloud Bus支持消息發送到所有已監聽的節點,或者某個特定服務的所有節點。同時,Spring Cloud Bus提供了一些HTTP接口/bus/*,用于觸發Spring Cloud Bus內部的事件。
目前,Spring Cloud Bus主要有以下兩個接口實現。
- ./bus/env:發送鍵值對去更新每個節點的Spring Environment。
- ./bus/refresh:重新加載每一個應用的配置信息,類似于/refresh。
所以,Spring Cloud Bus結合Spring Cloud Config 的使用,可以實現配置文件的自動更新。