日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

環(huán)境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12


簡(jiǎn)介

Spring Cloud Stream是一個(gè)框架,用于構(gòu)建與MQ連接的高度可伸縮的事件驅(qū)動(dòng)微服務(wù)。其目的是為了簡(jiǎn)化消息在 Spring Cloud 應(yīng)用程序中的開(kāi)發(fā)。屏蔽了各種MQ之間的差異,使得在更換MQ的時(shí)候不需要修改代碼。

Spring Cloud Stream支持多種綁定器實(shí)現(xiàn),如下:

  • RabbitMQ
  • Apache Kafka
  • Kafka Streams
  • Amazon Kinesis
  • google PubSub (partner maintained)
  • Solace PubSub+ (partner maintained)
  • Azure Event Hubs (partner maintained)
  • AWS SQS (partner maintained)
  • AWS SNS (partner maintained)
  • Apache RocketMQ (partner maintained)

詳細(xì)查看官方文檔,對(duì)應(yīng)每一個(gè)MQ都有一個(gè)Github地址。

Spring Cloud Stream的核心構(gòu)建塊是:

  • 目標(biāo)綁定器(Destination Binders):負(fù)責(zé)與MQ集成的組件。
  • 目標(biāo)綁定(Destination Bindings):MQ中間件與最終用戶(hù)提供的應(yīng)用程序代碼(生產(chǎn)者/消費(fèi)者)之間的橋梁。
  • 消息(Message):生產(chǎn)者和消費(fèi)者用來(lái)與目標(biāo)綁定器(以及通過(guò)MQ與其他應(yīng)用程序)通信的規(guī)范數(shù)據(jù)結(jié)構(gòu)。
Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

Stream 核心組件關(guān)系圖

快速入門(mén)

依賴(lài):

<properties>
  <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  </dependency>
</dependencies>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

應(yīng)用配置:

spring:
  rabbitmq:
    host: localhost
    virtual-host: bus
    port: 5672
    username: xxx
    password: xxx
---
spring:
  cloud:
    stream:
      bindings:
        #自定義輸入輸出
        myInput:
          #指定輸入通道對(duì)應(yīng)的主題名
          destination: demo
        myOutput:
          destination: demo

創(chuàng)建消息通道綁定的接口:

public interface StreamBinding {
 
  String INPUT = "myInput";
  String OUTPUT = "myOutput";
 
  @Input(StreamBinding.INPUT)
  SubscribableChannel input();
 
  @Output(StreamBinding.OUTPUT)
  MessageChannel output();
}

通過(guò) @Input和 @Output注解定義輸入通道和輸出通道名稱(chēng),這里的名稱(chēng)與上面配置文件中的是對(duì)應(yīng)的。

當(dāng)定義輸出通道的時(shí)候,需要返回 MessageChannel 接口對(duì)象,該接口定義了向消息通道發(fā)送消息的方法;定義輸入通道時(shí),需要返回 SubscribableChannel 接口對(duì)象,該接口集成自 MessageChannel 接口,它定義了維護(hù)消息通道訂閱者的方法。

這里的Input,Output兩個(gè)方法容器會(huì)分別創(chuàng)建一個(gè)Bean對(duì)象

創(chuàng)建消費(fèi)者:

@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
 
  private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);

  @StreamListener(StreamBinding.INPUT)
  public void receive(String message) {
    logger.info("接收到消息: {}", message);
  }
}

@EnableBinding 注解用來(lái)指定一個(gè)或多個(gè)定義了 @Input 或 @Output 注解的接口,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定。上面我們通過(guò) @EnableBinding(value = {StreamClient.class}) 綁定了 StreamClient 接口,該接口是我們自己實(shí)現(xiàn)的對(duì)輸入輸出消息通道綁定的定義

@StreamListener,主要定義在方法上,作用是將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽(tīng)器,注解中的屬性值對(duì)應(yīng)了監(jiān)聽(tīng)的消息通道名。上面我們將 receive 方法注冊(cè)為 myInput 消息通道的監(jiān)聽(tīng)處理器,當(dāng)我們往這個(gè)消息通道發(fā)送信息的時(shí)候,receiver 方法會(huì)執(zhí)行。

消息發(fā)送接口:

@Resource
private StreamBinding streamBinding;
@GetMApping("/send")
public void send() {
  streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}

啟動(dòng)服務(wù):

查看RabbitMQ

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

自動(dòng)為我們創(chuàng)建了一個(gè)隊(duì)列,隊(duì)列的名稱(chēng)是以我們?cè)谂渲梦募信渲玫拈_(kāi)頭,后面是隨機(jī)生成的。這個(gè)隊(duì)列會(huì)自動(dòng)刪除AD,服務(wù)關(guān)閉后就自動(dòng)刪除隊(duì)列;Excl:排他的,存在該隊(duì)列就不會(huì)在創(chuàng)建了。

修改端口后,再啟動(dòng)一個(gè)服務(wù):

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

創(chuàng)建了2個(gè)隊(duì)列,使用其中一個(gè)發(fā)送消息:

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 


Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

兩個(gè)服務(wù)都收到了消息。

消費(fèi)者組

上面啟動(dòng)了2個(gè)服務(wù)都能收到消息,在集群的環(huán)境下這樣肯定會(huì)帶來(lái)問(wèn)題,如果是業(yè)務(wù)方面的就會(huì)出現(xiàn)重復(fù)數(shù)據(jù),這時(shí)候我們可以通過(guò)設(shè)置分組的解決此問(wèn)題。修改配置:

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定輸入通道對(duì)應(yīng)的主題名
          destination: demo
          #指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽(tīng)這一個(gè)隊(duì)列
          #多個(gè)實(shí)例會(huì)輪詢(xún)的接收消息
          group: g_test
        myOutput:
          destination: demo

再次啟動(dòng)服務(wù)后,兩個(gè)服務(wù)會(huì)輪詢(xún)的接收到消息。

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

啟動(dòng)服務(wù)后,兩個(gè)服務(wù)都同時(shí)監(jiān)聽(tīng)同一個(gè)隊(duì)列。隊(duì)列也不是隨機(jī)生成的了,并且隊(duì)列是持久化的,服務(wù)斷開(kāi)后隊(duì)列也不會(huì)自動(dòng)刪除。

消息分區(qū)

通過(guò)消費(fèi)組的設(shè)置,雖然能保證同一消息只被一個(gè)消費(fèi)者進(jìn)行接收和處理,但是對(duì)于特殊業(yè)務(wù)情況,除了要保證單一實(shí)例消費(fèi)之外,還希望那些具備相同特征的消息都能被同一個(gè)實(shí)例消費(fèi),這個(gè)就可以使用 Spring Cloud Stream 提供的消息分區(qū)功能。修改配置

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定輸入通道對(duì)應(yīng)的主題名
          destination: demo
          #指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽(tīng)這一個(gè)隊(duì)列
          #多個(gè)實(shí)例會(huì)輪詢(xún)的接收消息
          group: g_test
          consumer:
            #通過(guò)該參數(shù)開(kāi)啟消費(fèi)者分區(qū)功能
            partitioned: true
        myOutput:
          destination: demo
          producer:
            #這里的配置也可以是SpEL表達(dá)式,比如:headers['partition']通過(guò)消息header獲取屬性
            #這里會(huì)通過(guò)表達(dá)式及消息對(duì)象進(jìn)行計(jì)算得到一個(gè)Key,然后獲取key的hashCode
            # 得到hashCode以后會(huì)與partitionCount進(jìn)行取模運(yùn)算得到具體的分區(qū)
            partitionKeyExpression: '1' #我這里給的值就是對(duì)應(yīng)的instanceIndex的值,你希望誰(shuí)接收就設(shè)置誰(shuí)配置的值即可
            partitionCount: 2
      #實(shí)例總數(shù)
      instanceCount: 2
      #該參數(shù)設(shè)置了當(dāng)前實(shí)例的索引號(hào),從 0 開(kāi)始
      instanceIndex: 0

計(jì)算分區(qū)源碼:

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

最后得到分區(qū)信息后會(huì)在消息頭中放入一個(gè)scst_partition為key,partition為值的頭信息。

啟動(dòng)多個(gè)實(shí)例后,測(cè)試發(fā)現(xiàn)所有的消息都只是同一個(gè)實(shí)例收到消息

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 


Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

交換機(jī)分別與每一個(gè)服務(wù)進(jìn)行綁定使用不同的Routing Key這樣在發(fā)送消息的時(shí)候就可以根據(jù)計(jì)算處理的分區(qū)進(jìn)行定向發(fā)送消息了。

通過(guò)源碼查看:

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

這里通過(guò)我們的配置交換機(jī)為demo。接著是獲取路由key了

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

 

這里會(huì)從消息header中獲取key = scst_partition的頭信息。

這樣針對(duì)使用RabbitMQ的中間件發(fā)送消息所需要的交換機(jī)及路由key就確定下來(lái)了。

完畢!!!

分享到:
標(biāo)簽:Spring Cloud
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定