環境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12
簡介
Spring Cloud Stream是一個框架,用于構建與MQ連接的高度可伸縮的事件驅動微服務。其目的是為了簡化消息在 Spring Cloud 應用程序中的開發。屏蔽了各種MQ之間的差異,使得在更換MQ的時候不需要修改代碼。
Spring Cloud Stream支持多種綁定器實現,如下:
- RabbitMQ
- Apache Kafka
- Kafka Streams
- Amazon Kinesis
- google PubSub (partner maintained)
- Solace PubSub+ (partner maintained)
- Azure Event Hubs (partner maintained)
- AWS SQS (partner maintained)
- AWS SNS (partner maintained)
- Apache RocketMQ (partner maintained)
詳細查看官方文檔,對應每一個MQ都有一個Github地址。
Spring Cloud Stream的核心構建塊是:
- 目標綁定器(Destination Binders):負責與MQ集成的組件。
- 目標綁定(Destination Bindings):MQ中間件與最終用戶提供的應用程序代碼(生產者/消費者)之間的橋梁。
- 消息(Message):生產者和消費者用來與目標綁定器(以及通過MQ與其他應用程序)通信的規范數據結構。
Stream 核心組件關系圖
快速入門
依賴:
<properties>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
應用配置:
spring:
rabbitmq:
host: localhost
virtual-host: bus
port: 5672
username: xxx
password: xxx
---
spring:
cloud:
stream:
bindings:
#自定義輸入輸出
myInput:
#指定輸入通道對應的主題名
destination: demo
myOutput:
destination: demo
創建消息通道綁定的接口:
public interface StreamBinding {
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(StreamBinding.INPUT)
SubscribableChannel input();
@Output(StreamBinding.OUTPUT)
MessageChannel output();
}
通過 @Input和 @Output注解定義輸入通道和輸出通道名稱,這里的名稱與上面配置文件中的是對應的。
當定義輸出通道的時候,需要返回 MessageChannel 接口對象,該接口定義了向消息通道發送消息的方法;定義輸入通道時,需要返回 SubscribableChannel 接口對象,該接口集成自 MessageChannel 接口,它定義了維護消息通道訂閱者的方法。
這里的Input,Output兩個方法容器會分別創建一個Bean對象
創建消費者:
@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
@StreamListener(StreamBinding.INPUT)
public void receive(String message) {
logger.info("接收到消息: {}", message);
}
}
@EnableBinding 注解用來指定一個或多個定義了 @Input 或 @Output 注解的接口,以此實現對消息通道(Channel)的綁定。上面我們通過 @EnableBinding(value = {StreamClient.class}) 綁定了 StreamClient 接口,該接口是我們自己實現的對輸入輸出消息通道綁定的定義
@StreamListener,主要定義在方法上,作用是將被修飾的方法注冊為消息中間件上數據流的事件監聽器,注解中的屬性值對應了監聽的消息通道名。上面我們將 receive 方法注冊為 myInput 消息通道的監聽處理器,當我們往這個消息通道發送信息的時候,receiver 方法會執行。
消息發送接口:
@Resource
private StreamBinding streamBinding;
@GetMApping("/send")
public void send() {
streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}
啟動服務:
查看RabbitMQ
自動為我們創建了一個隊列,隊列的名稱是以我們在配置文件中配置的開頭,后面是隨機生成的。這個隊列會自動刪除AD,服務關閉后就自動刪除隊列;Excl:排他的,存在該隊列就不會在創建了。
修改端口后,再啟動一個服務:
創建了2個隊列,使用其中一個發送消息:
兩個服務都收到了消息。
消費者組
上面啟動了2個服務都能收到消息,在集群的環境下這樣肯定會帶來問題,如果是業務方面的就會出現重復數據,這時候我們可以通過設置分組的解決此問題。修改配置:
spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對應的主題名
destination: demo
#指定一個組;指定分組以后,不管你啟動多少個實例,所有的實例都監聽這一個隊列
#多個實例會輪詢的接收消息
group: g_test
myOutput:
destination: demo
再次啟動服務后,兩個服務會輪詢的接收到消息。
啟動服務后,兩個服務都同時監聽同一個隊列。隊列也不是隨機生成的了,并且隊列是持久化的,服務斷開后隊列也不會自動刪除。
消息分區
通過消費組的設置,雖然能保證同一消息只被一個消費者進行接收和處理,但是對于特殊業務情況,除了要保證單一實例消費之外,還希望那些具備相同特征的消息都能被同一個實例消費,這個就可以使用 Spring Cloud Stream 提供的消息分區功能。修改配置
spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對應的主題名
destination: demo
#指定一個組;指定分組以后,不管你啟動多少個實例,所有的實例都監聽這一個隊列
#多個實例會輪詢的接收消息
group: g_test
consumer:
#通過該參數開啟消費者分區功能
partitioned: true
myOutput:
destination: demo
producer:
#這里的配置也可以是SpEL表達式,比如:headers['partition']通過消息header獲取屬性
#這里會通過表達式及消息對象進行計算得到一個Key,然后獲取key的hashCode
# 得到hashCode以后會與partitionCount進行取模運算得到具體的分區
partitionKeyExpression: '1' #我這里給的值就是對應的instanceIndex的值,你希望誰接收就設置誰配置的值即可
partitionCount: 2
#實例總數
instanceCount: 2
#該參數設置了當前實例的索引號,從 0 開始
instanceIndex: 0
計算分區源碼:
最后得到分區信息后會在消息頭中放入一個scst_partition為key,partition為值的頭信息。
啟動多個實例后,測試發現所有的消息都只是同一個實例收到消息
交換機分別與每一個服務進行綁定使用不同的Routing Key這樣在發送消息的時候就可以根據計算處理的分區進行定向發送消息了。
通過源碼查看:
這里通過我們的配置交換機為demo。接著是獲取路由key了
這里會從消息header中獲取key = scst_partition的頭信息。
這樣針對使用RabbitMQ的中間件發送消息所需要的交換機及路由key就確定下來了。
完畢!!!