日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

快速入門

五分鐘左右為你展示如何創(chuàng)建一個Spring Cloud Stream的應(yīng)用程序,它是如何從消息中間件中接收并輸出接收的信息到console,這里的消息中間件有兩種選擇:RabbitMQ和Kafka,本文以RabbitMQ為準(zhǔn)

這節(jié)主要簡化官方文檔為兩步:

  1. 使用idea新建項目
  2. 添加 Message Handler , Building 并運行

一、使用idea新建項目

打開項目目錄,新建一個moudle,名為FirstStream,pom文件如下

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.Apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">	<modelVersion>4.0.0</modelVersion> 	<groupId>com.cnblogs.hellxz</groupId>	<artifactId>FirstStream</artifactId>	<version>0.0.1-SNAPSHOT</version>	<packaging>jar</packaging> 	<name>FirstStream</name>	<description>Demo project for Spring Boot</description> 	<parent>		<groupId>org.springframework.cloud</groupId>		<artifactId>spring-cloud-starter-parent</artifactId>		<version>Dalston.SR5</version>		<relativePath/>	</parent> 	<dependencies>		<!-- Spring boot 測試用 -->		<dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-test</artifactId>			<scope>test</scope>		</dependency>		<!-- Stream rabbit 依賴中包含 binder-rabbit,所以只需導(dǎo)入此依賴即可 -->		<dependency>			<groupId>org.springframework.cloud</groupId>			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>			<version>2.0.0.RELEASE</version>		</dependency>	</dependencies> </project> 

二、 添加 Message Handler , Building 并運行

在com.cnblogs.hellxz包下添加啟動類,并添加

@SpringBootApplication@EnableBinding(Sink.class)public class FirstStreamApp { 	public static void mAIn(String[] args) {		SpringApplication.run(FirstStreamApp.class, args);	} 	@StreamListener(Sink.INPUT)	public void receive(Object payload) {		logger.info("Received: " + payload);	}}
  • 我們通過使用@EnableBinding(Sink.class)開啟了Sink的binding(綁定),這樣做會向框架發(fā)出信號,以啟動與消息傳遞中間件的綁定,并自動創(chuàng)建綁定到Sink.INPUT通道的目標(biāo)(即queue,topic和其他)。
  • 我們添加了一個處理方法,去監(jiān)聽消息類型為String的消息,這么做是為么向你展示框架的核心特性之一——自動轉(zhuǎn)換入?yún)⑾Ⅲw為指定類型

啟動項目,我們?nèi)ゲ榭碦abbitMQ的網(wǎng)頁 http://localhost:15672 點擊Connections,發(fā)現(xiàn)現(xiàn)在已經(jīng)有一個連接進(jìn)來了,我們剛才的項目,在Queues中也有一個隊列被創(chuàng)建,我的是
input.anonymous.L92bTj6FRTyOC0QE-Pl0HA,我們點開那個唯一的隊列,往下拉點開publish message,payload處輸入一個hello world,點Publlish message發(fā)送一個消息

 

查看控制臺,你會看到Received: hello world

對于連接非本地RabbitMQ的配置:

spring.rabbitmq.host=<rabbitMQ所在的ip>

spring.rabbitmq.port=<端口號>

spring.rabbitmq.username=<登錄用戶名>

spring.rabbitmq.password=<密碼>

Spring Cloud Stream介紹

Spring Cloud Stream是一個用于構(gòu)建消息驅(qū)動的微服務(wù)應(yīng)用程序的框架,是一個基于Spring Boot 創(chuàng)建的獨立生產(chǎn)級的,使用Spring Integration提供連接到消息代理的Spring應(yīng)用。介紹持久發(fā)布 - 訂閱(persistent publish-subscribe)的語義,消費組(consumer groups)和分區(qū)(partitions)的概念。

你可以添加@EnableBinding注解在你的應(yīng)用上,從而立即連接到消息代理,在方法上添加@StreamListener以使其接收流處理事件,下面的例子展示了一個Sink應(yīng)用接收外部信息

@SpringBootApplication@EnableBinding(Sink.class)public class VoteRecordingSinkApplication {   public static void main(String[] args) {    SpringApplication.run(VoteRecordingSinkApplication.class, args);  }   @StreamListener(Sink.INPUT)  public void processVote(Vote vote) {      votingService.recordVote(vote);  }}

@EnableBinding注解會帶著一個或多個接口作為參數(shù)(舉例中使用的是Sink的接口),一個接口往往聲名了輸入和輸出的渠道,Spring Stream提供了Source、Sink、Processor這三個接口,你也可以自己定義接口。

下面展示的是Sink的接口內(nèi)容

public interface Sink {  String INPUT = "input";   @Input(Sink.INPUT)  SubscribableChannel input();}

@Input注解區(qū)分了一個輸入channel,通過它接收消息到應(yīng)用中,使用@Output注解 區(qū)分輸出channel,消息通過它離開應(yīng)用,使用這兩個注解可以帶一個channel的名字作為參數(shù),如果未提供channel名稱,則使用帶注釋的方法的名稱。

你可以使用Spring Cloud Stream 現(xiàn)成的接口,也可以使用@Autowired注入這個接口,下面在測試類中舉例

@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class LoggingConsumerApplicationTests { 	@Autowired	private Sink sink; 	@Test	public void contextLoads() {		assertNotNull(this.sink.input());	}}

主要概念(Main Concepts)

  1. 應(yīng)用模型
  2. 應(yīng)用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中Binder 交互,通過我們配置來綁定,而 Spring Cloud Stream 的 Binder 負(fù)責(zé)與中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動的方式。

 

  1. 抽象綁定器(The Binder Abstraction)
  2. Spring Cloud Stream實現(xiàn)Kafkat和RabbitMQ的Binder實現(xiàn),也包括了一個TestSupportBinder,用于測試。你也可以寫根據(jù)API去寫自己的Binder.
  3. Spring Cloud Stream 同樣使用了Spring boot的自動配置,并且抽象的Binder使Spring Cloud Stream的應(yīng)用獲得更好的靈活性,比如:我們可以在application.yml或application.properties中指定參數(shù)進(jìn)行配置使用Kafka或者RabbitMQ,而無需修改我們的代碼。

在前面我們測試的項目中并沒有修改application.properties,自動配置得益于Spring Boot

通過 Binder ,可以方便地連接中間件,可以通過修改application.yml中的
spring.cloud.stream.bindings.input.destination 來進(jìn)行改變消息中間件(對應(yīng)于Kafka的topic,RabbitMQ的exchanges)

? 在這兩者間的切換甚至不需要修改一行代碼。

  1. 發(fā)布-訂閱(Persistent Publish-Subscribe Support)
  2. 如下圖是經(jīng)典的Spring Cloud Stream的 發(fā)布-訂閱 模型,生產(chǎn)者 生產(chǎn)消息發(fā)布在shared topic(共享主題)上,然后 消費者 通過訂閱這個topic來獲取消息

 

?

其中topic對應(yīng)于Spring Cloud Stream中的destinations(Kafka 的topic,RabbitMQ的 exchanges)

官方文檔這塊原理說的有點深,就沒寫,詳見官方文檔

  1. 消費組(Consumer Groups)
  2. 盡管發(fā)布-訂閱 模型通過共享的topic連接應(yīng)用變得很容易,但是通過創(chuàng)建特定應(yīng)用的多個實例的來擴展服務(wù)的能力同樣重要,但是如果這些實例都去消費這條數(shù)據(jù),那么很可能會出現(xiàn)重復(fù)消費的問題,我們只需要同一應(yīng)用中只有一個實例消費該消息,這時我們可以通過消費組來解決這種應(yīng)用場景, 當(dāng)一個應(yīng)用程序不同實例放置在一個具有競爭關(guān)系的消費組中,組里面的實例中只有一個能夠消費消息
  3. 設(shè)置消費組的配置為spring.cloud.stream.bindings.<channelName>.group,
  4. 下面舉一個DD博客中的例子:
  5. 下圖中,通過網(wǎng)絡(luò)傳遞過來的消息通過主題,按照分組名進(jìn)行傳遞到消費者組中
  6. 此時可以通過spring.cloud.stream.bindings.input.group=Group-A或spring.cloud.stream.bindings.input.group=Group-B進(jìn)行指定消費組

 

所有訂閱指定主題的組都會收到發(fā)布消息的一個備份,每個組中只有一個成員會收到該消息;如果沒有指定組,那么默認(rèn)會為該應(yīng)用分配一個匿名消費者組,與所有其它組處于 訂閱-發(fā)布 關(guān)系中。ps:也就是說如果管道沒有指定消費組,那么這個匿名消費組會與其它組一起消費消息,出現(xiàn)了重復(fù)消費的問題。

  1. 消費者類型(Consumer Types)
  2. 1)支持有兩種消費者類型:
  3. Message-driven (消息驅(qū)動型,有時簡稱為異步)
  4. Polled (輪詢型,有時簡稱為 同步)
  5. 在Spring Cloud 2.0版本前只支持 Message-driven這種異步類型的消費者,消息一旦可用就會傳遞,并且有一個線程可以處理它;當(dāng)你想控制消息的處理速度時,可能需要用到同步消費者類型。
  6. 2)持久化
  7. 一般來說所有擁有訂閱主題的消費組都是持久化的,除了匿名消費組。 Binder的實現(xiàn)確保了所有訂閱關(guān)系的消費訂閱是持久的,一個消費組中至少有一個訂閱了主題,那么被訂閱主題的消息就會進(jìn)入這個組中,無論組內(nèi)是否停止。
  8. 注意: 匿名訂閱本身是非持久化的,但是有一些Binder的實現(xiàn)(比如RabbitMQ)則可以創(chuàng)建非持久化的組訂閱
  9. 通常情況下,當(dāng)有一個應(yīng)用綁定到目的地的時候,最好指定消費消費組。擴展Spring Cloud Stream應(yīng)用程序時,必須為每個輸入綁定指定一個使用者組。這樣做可以防止應(yīng)用程序的實例接收重復(fù)的消息(除非需要這種行為,這是不尋常的)。
  10. 分區(qū)支持(Partitioning Support)
  11. 在消費組中我們可以保證消息不會被重復(fù)消費,但是在同組下有多個實例的時候,我們無法確定每次處理消息的是不是被同一消費者消費,分區(qū)的作用就是為了確保具有共同特征標(biāo)識的數(shù)據(jù)由同一個消費者實例進(jìn)行處理,當(dāng)然前邊的例子是狹義的,通信代理(broken topic)也可以被理解為進(jìn)行了同樣的分區(qū)劃分。Spring Cloud Stream 的分區(qū)概念是抽象的,可以為不支持分區(qū)Binder實現(xiàn)(例如RabbitMQ)也可以使用分區(qū)。

 

注意:要使用分區(qū)處理,你必須同時對生產(chǎn)者和消費者進(jìn)行配置。

編程模型(Programming Model)

為了理解編程模型,需要熟悉下列核心概念:

  • Destination Binders(目的地綁定器): 負(fù)責(zé)與外部消息系統(tǒng)集成交互的組件
  • Destination Bindings(目的地綁定): 在外部消息系統(tǒng)和應(yīng)用的生產(chǎn)者和消費者之間的橋梁(由Destination Binders創(chuàng)建)
  • Message (消息): 用于生產(chǎn)者、消費者通過Destination Binders溝通的規(guī)范數(shù)據(jù)。
  1. Destination Binders(目的地綁定器)
  2. Destination Binders是Spring Cloud Stream與外部消息中間件提供了必要的配置和實現(xiàn)促進(jìn)集成的擴展組件。集成了生產(chǎn)者和消費者的消息的路由、連接和委托、數(shù)據(jù)類型轉(zhuǎn)換、用戶代碼調(diào)用等。
  3. 盡管Binders幫我們處理了許多事情,我們?nèi)孕枰獙λM(jìn)行配置。之后會講
  4. Destination Bindings (目的地綁定) :
  5. 如前所述,Destination Bindings 提供連接外部消息中間件和應(yīng)用提供的生產(chǎn)者和消費者中間的橋梁。
  6. 使用@EnableBinding 注解打在一個配置類上來定義一個Destination Binding,這個注解本身包含有@Configuration,會觸發(fā)Spring Cloud Stream的基本配置。
  7. ?

接下來的例子展示完全配置且正常運行的Spring Cloud Stream應(yīng)用,由INPUT接收消息轉(zhuǎn)換成String 類型并打印在控制臺上,然后轉(zhuǎn)換出一個大寫的信息返回到OUTPUT中。

@SpringBootApplication@EnableBinding(Processor.class)public class MyApplication { 	public static void main(String[] args) {		SpringApplication.run(MyApplication.class, args);	} 	@StreamListener(Processor.INPUT)	@SendTo(Processor.OUTPUT)	public String handle(String value) {		System.out.println("Received: " + value);		return value.toUpperCase();	}}

通過SendTo注解將方法內(nèi)返回值轉(zhuǎn)發(fā)到其他消息通道中,這里因為沒有定義接收通道,提示消息已丟失,解決方法是新建一個接口,如下

public interface MyPipe{ //方法1 @Input(Processor.OUTPUT) //這里使用Processor.OUTPUT是因為要同一個管道,或者名稱相同 SubscribableChannel input(); //還可以如下這樣=====二選一即可========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input();}

然后在在上邊的方法下邊加一個方法,并在@EnableBinding注解中改成@EnableBinding({Processor.class, MyPipe.class})

@StreamListener(MyPipe.INPUT) public void handleMyPipe(String value) { System.out.println("Received: " + value); }

Spring Cloud Stream已經(jīng)為我們提供了三個綁定消息通道的默認(rèn)實現(xiàn)

  • Sink:通過指定消費消息的目標(biāo)來標(biāo)識消息使用者的約定。
  • Source:與Sink相反,用于標(biāo)識消息生產(chǎn)者的約定。
  • Processor:集成了Sink和Source的作用,標(biāo)識消息生產(chǎn)者和使用者

他們的源碼分別為:

public interface Sink {    String INPUT = "input";     @Input("input")    SubscribableChannel input();} public interface Source {    String OUTPUT = "output";     @Output("output")    MessageChannel output();} public interface Processor extends Source, Sink {}

Sink和Source中分別通過@Input和@Output注解定義了輸入通道和輸出通道,通過使用這兩個接口中的成員變量來定義輸入和輸出通道的名稱,Processor由于繼承自這兩個接口,所以同時擁有這兩個通道。

注意:擁有多條管道的時候不能有輸入輸出管道名相同的,否則會出現(xiàn)發(fā)送消息被自己接收或報錯的情況

我們可以根據(jù)上述源碼的方式來定義我們自己的輸入輸出通道,定義輸入通道需要返回SubscribaleChannel接口對象,這個接口繼承自MessageChannel接口,它定義了維護(hù)消息通道訂閱者的方法;定義輸出通道則需要返回MessageChannel接口對象,它定義了向消息通道發(fā)送消息的方法。

自定義消息通道 發(fā)送與接收

依照上面的內(nèi)容,我們也可以創(chuàng)建自己的綁定通道 如果你實現(xiàn)了上邊的MyPipe接口,那么直接使用這個接口就好

  1. 和主類同包下建一個MyPipe接口,實現(xiàn)如下
package com.cnblogs.hellxz; import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.SubscribableChannel; public interface MyPipe {     //方法1//    @Input(Source.OUTPUT) //Source.OUTPUT的值是output,我們自定義也是一樣的//    SubscribableChannel input(); //使用@Input注解標(biāo)注的輸入管道需要使用SubscribableChannel來訂閱通道     //========二選一使用===========     //方法2    String INPUT = "output";     @Input(MyPipe.INPUT)    SubscribableChannel input();}

這里用Source.OUTPUT和第二種方法 是一樣的,我們只要將消息發(fā)送到名為output的管道中,那么監(jiān)聽output管道的輸入流一端就能獲得數(shù)據(jù)

  1. 擴展主類,添加監(jiān)聽output管道方法
	@StreamListener(MyPipe.INPUT)	public void receiveFromMyPipe(Object payload){		logger.info("Received: "+payload);	}
  1. 在主類的頭上的@EnableBinding改為@EnableBinding({Sink.class, MyPipe.class}),加入了Mypipe接口的綁定
  2. 在test/JAVA下創(chuàng)建com.cnblogs.hellxz,并在包下新建一個測試類,如下
  3. package com.cnblogs.hellxz; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.support.MessageBuilder;import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class)@EnableBinding(value = {Source.class})@SpringBootTestpublic class TestSendMessage { @Autowired private Source source; //注入接口和注入MessageChannel的區(qū)別在于發(fā)送時需不需要調(diào)用接口內(nèi)的方法 @Test public void testSender() { source.output().send(MessageBuilder.withPayload("Message from MyPipe").build()); //假設(shè)注入了MessageChannel messageChannel; 因為綁定的是Source這個接口, //所以會使用其中的唯一產(chǎn)生MessageChannel的方法,那么下邊的代碼會是 //messageChannel.send(MessageBuilder.withPayload("Message from MyPipe").build()); }}
  4. 啟動主類,清空輸出,運行測試類,然后你就會得到在主類的控制臺的消息以log形式輸出Message from MyPipe

我們是通過注入消息通道,并調(diào)用他的output方法聲明的管道獲得的MessageChannel實例,發(fā)送的消息

管道注入過程中可能會出現(xiàn)的問題

通過注入消息通道的方式雖然很直接,但是也容易犯錯,當(dāng)一個接口中有多個通道的時候,他們返回的實例都是MessageChannel,這樣通過@Autowired注入的時候往往會出現(xiàn)有多個實例找到無法確定需要注入實例的錯誤,我們可以通過@Qualifier指定消息通道的名稱,下面舉例:

  1. 在主類包內(nèi)創(chuàng)建一個擁有多個輸出流的管道
  2. /** * 多個輸出管道 */public interface MutiplePipe { @Output("output1") MessageChannel output1(); @Output("output2") MessageChannel output2();}
  3. 創(chuàng)建一個測試類
  4. @RunWith(SpringRunner.class)@EnableBinding(value = {MutiplePipe.class}) //開啟綁定功能@SpringBootTest //測試public class TestMultipleOutput { @Autowired private MessageChannel messageChannel; @Test public void testSender() { //向管道發(fā)送消息 messageChannel.send(MessageBuilder.withPayload("produce by multiple pipe").build()); }}
  5. 啟動測試類,會出現(xiàn)剛才說的不唯一的bean,無法注入
  6. Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.messaging.MessageChannel' available: expected single matching bean but found 6: output1,output2,input,output,nullChannel,errorChannel
  7. 我們在@Autowired旁邊加上@Qualifier("output1"),然后測試就可以正常啟動了
  8. 通過上邊的錯誤,我們可以清楚的看到,每個MessageChannel都是使用消息通道的名字做為bean的名稱。
  9. 這里我們沒有使用監(jiān)聽這個管道,僅為了測試并發(fā)現(xiàn)問題

常用配置

消費組和分區(qū)的設(shè)置

給消費者設(shè)置消費組和主題

  1. 設(shè)置消費組: spring.cloud.stream.bindings.<通道名>.group=<消費組名>
  2. 設(shè)置主題: spring.cloud.stream.bindings.<通道名>.destination=<主題名>

給生產(chǎn)者指定通道的主題:
spring.cloud.stream.bindings.<通道名>.destination=<主題名>

消費者開啟分區(qū),指定實例數(shù)量與實例索引

  1. 開啟消費分區(qū): spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
  2. 消費實例數(shù)量: spring.cloud.stream.instanceCount=1 (具體指定)
  3. 實例索引: spring.cloud.stream.instanceIndex=1 #設(shè)置當(dāng)前實例的索引值

生產(chǎn)者指定分區(qū)鍵

  1. 分區(qū)鍵: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分區(qū)鍵>
  2. 分區(qū)數(shù)量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分區(qū)數(shù)量>

分享到:
標(biāo)簽:Spring
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達(dá)人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定