日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

準備工作純JAVA項目依賴

創建一個maven-java項目在pom.xml中添加以下依賴:

com.RabbitMQ:amqp-client:5.15.0org.projectlombok:lombok:1.18.24junit:junit:4.13.2org.Apache.logging.log4j:log4j-api:2.18.0org.apache.logging.log4j:log4j-core:2.18.0org.apache.logging.log4j:log4j-slf4j-impl:2.18.0org.slf4j:slf4j-api:1.7.36cn.hutool:hutool-core:5.8.5日志配置文件

創建log4j2.xml文件,用于輸出日志:

連接工具類

  • 每一次連接時,地址、端口等信息都相同,所以將這些相同的代碼寫成一個工具類。
  • 這兒沒有將Connection聲明成靜態的成員變量,因為,對于生產者與消費者,應該是分開部署的,也不可能使用同一個Connection對象。
package wj.mq.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnUtils {public static Connection newConnection(){ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.56.61");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");Connection connection = null;try {connection = factory.newConnection();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);return connection;

 

最后的項目結構如下:


 

SpringBoot項目依賴


 

配置文件server.port=8888spring.Application.name=helloworld# 配置mqspring.rabbitmq.host=192.168.56.61spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin# 配置日志logging.level.root=INFOlogging.file.name=./logs/log.log相關注解

@RabbitListener(queues = {"HelloSpring"}) : 可以注解在類上,也可以注解在方法上,消費消息。

@RabbitHandler() : 注解在方法上,與@RabbitListener共同使用。

簡單模式


 

java項目生產者Publisher代碼package wj.rabbitmq.demo01;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;import java.nio.charset.StandardCharsets;@Slf4jpublic class Publisher {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";//獲取一個連接try(Connection connection = ConnUtils.newConnection()){//獲取信道Channel channel = connection.createChannel();//聲明一個隊列名稱為channel.queueDeclare(queueName, true, false,false, null);//發送消息:Exchange空默認使用/交換機channel.basicPublish("",queueName,null,"HelloWorld".getbytes(StandardCharsets.UTF_8));log.info("送信息完成");


 

查看UI

發送完成以后,查看RabbitMQ的UI管理界面中的Queue選項卡:


 

點HelloQueue這個隊列名稱,進入詳細界面:


 

查看Bindings,可以看到目前是默認交換機(即AMQP Default)綁定到當前這個隊列:


 

消費者Consumer代碼package wj.rabbitmq.demo01;import com.rabbitmq.client.*;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;@Slf4jpublic class Consumer {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";try (Connection con = ConnUtils.newConnection()) {Channel channel = con.createChannel();channel.queueDeclare(queueName, true, false,false, null);//接收到數據以后的回調函數Delivercallback callback = (consumerTag, message) -> {byte[] body = message.getBody();String str = new String(body);log.info("接收到信息:{}", str);//取消處理信息的回調CancelCallback cancelCallback = consumerTag -> {//ignore//消費監聽channel.basicConsume(queueName, false, callback, cancelCallback);

截圖:


 

運行后效果,即輸出從隊列中讀取的數據:

09:40:22.886 [pool-2-thread-4] 接收到信息:HelloWorld

SpringBoot項目創建Springboot項目并添加以下依賴

  • org.springframework.boot:spring-boot-starter-amqp:2.7.3
  • org.springframework.boot:spring-boot-starter-thymeleaf:2.7.3
  • org.springframework.boot:Spring-boot-starter-web:2.7.3
  • org.springframework.boot:spring-boot-devtools:2.7.3
  • org.springframework.boot:spring-boot-configuration-processor:2.7.3
  • org.projectlombok:lombok:1.18.24
  • org.springframework.boot:spring-boot-starter-test:2.7.3
  • org.springframework.amqp:spring-rabbit-test:2.4.6
  • cn.hutool:hutool-core:5.8.6
配置類

 

配置類的主要功能是配置Queue,Exchange等。以下我這兒僅需要一個Queue所以僅配置了一個Queue。

package wj.mq.config;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import lombok.extern.slf4j.Slf4j;@Slf4j@Configurationpublic class RabbitMQConfig {* 在使用隊列之前,必須要先聲明這個隊列,如果不在這兒聲明,就必須要在@RabbitListener時添加queuesToDeclare如下:@RabbitListener(queues = {"HelloSpring"},queuesToDeclare = @Queue("HelloSpring"))* @return@Beanpublic Queue helloSpringQueue() {Map args = new HashMap<>();args.put("name", "HelloSpring");Queue queue = new Queue("HelloSpring",true, false, false,args);log.info("隊列創建成功:{}",queue.getName());return queue;


 

啟動類

與普通的啟動的類類似,只是為了使用調度功能,添加了@EnableScheduling注解。

package wj.mq;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import lombok.extern.slf4j.Slf4j;@Slf4j@Controller@EnableScheduling@SpringBootApplicationpublic class HelloWorldApplication {public static void main(String[] args) {SpringApplication.run(HelloWorldApplication.class, args);log.info("Started..");@ResponseBody@RequestMapping(value = {"/",""})public String index() {return "index";

添加EnableScheduling注解,后面用到定時任務,讓生產者定時發布消息:


 

生產者

convertAndSend可以發布任意的對象,默認會被轉換成指定的格式

package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class HelloWorldSender {@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {String msg = ""+times;rabbitTemplate.convertAndSend("HelloSpring",msg);log.info("Send {} ok",msg);times++;

添加@Component注解為Spring Bean組件。

注入rabbitTemplate。

通過convertAndSend(QueueName,Object)發送數據給批定的隊列。


 

消費者package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {@RabbitHandlerpublic void onMessage(String message) {log.info("消費者1,接收到信息: {}",message);

在類上添加注解@RabbitListener(queues=..)

在接收信息的方法上添加@RabbitHandler


 

消費者2(另一種使用注解的方法)

package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;* 第二種接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {* 并設置為手工確認@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(Message message,Channel channel) throws Exception {log.info("消費者2,接收到信息: {}",new String(message.getBody()));long id = message.getMessageProperties().getDeliveryTag();//手工確認channel.basicAck(id, false);

直接將注解寫在方法上,并設置為手工確認。

注意接收的參數為Message對象和Channel對象。

使用channel.ack進行手工確認。


 

生產者-發布一個對象聲明JavaBean必須實現序列化接口package wj.mq.domain;import java.io.Serializable;import lombok.Getter;import lombok.Setter;@Getter@Setterpublic class Person implements Serializable {private static final long serialVersionUID = 1L;private String name;private Integer age;生產者

注意以下通過convertAndSend直接發布了一個對象。

package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import wj.mq.domain.Person;@Componentpublic class HelloWorldSender {@Autowiredprivate Queue helloSpringQueue;@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {Person person = new Person();//聲明對象并直接發布person.setName("Jack");person.setAge(times);rabbitTemplate.convertAndSend(helloSpringQueue.getName(),person);times++;

查看RabbitMQ UI可見編碼類型為序列化再轉base64的方式:


 

消費者-消費一個對象直接接收消費對象package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.alibaba.fastJSON2.JSONObject;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson對象即可@RabbitHandler()public void onMessage(Person message) {int sleep = 1000*random.nextInt(20);ThreadUtil.sleep(sleep);log.info("消費者1,休眠{}ms后處理完成信息:{},共處理{}消息",sleep,JSONObject.toJSONString(message),count++);接收Message對象,然后自己做轉換package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;* 第二種接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {int count = 1;* 并設置為手工確認@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(@Payload()Message message,Channel channel) throws Exception {int sleep = 0;//通過ObjectReader讀取對象ObjectInputStream objIn = new ObjectInputStream(new ByteArrayInputStream(message.getBody()));Object readObject = objIn.readObject();Person p = (Person) readObject;objIn.close();log.info("消費者2,休眠{}ms后處理完成信息: {},共處理{}消息",sleep,JSONObject.toJSONString(p),count++);long id = message.getMessageProperties().getDeliveryTag();channel.basicAck(id, false);//手工確認

最后顯示的結果,與之前相同


 

最終項目結構


 

運行測試

兩個消費者,依次接收RabbitMQ發送的消息:


 

關于springboot中的更多設置設置MessageConvert

默認情況下,通過rabbitTemplate.sendAndConvert(..)發送的object對象會使用Java序列化方式傳輸。可以在UI界面上,通過讀取一個Queue中的消息的方式,查看隊列中的數據格式:java-serialized。


 

我們可以修改這種格式,如JSON

以下注意給RabbitTemplate.setMessageConverter(..)即可以設置為JSON轉換格式。

package wj.mq.config.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class HelloWorldConfig {@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic Queue helloSpringQueue() {return new Queue("HelloSpringQueue", true, false,false);@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rt =new RabbitTemplate();rt.setConnectionFactory(connectionFactory);rt.setMessageConverter(messageConverter());return rt;

設置后,查看Message的格式


 

接收 deliverTag

僅通過@RabbitListener的方法上,可以通過添加@Header接收具體指定的數據,如下。

注意@header的部分的代碼。

package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"#{helloSpringQueue.name}"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson對象即可@RabbitHandler()public void onMessage(@Payload() Person message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) {int sleep = 1000*random.nextInt(20)*0;ThreadUtil.sleep(sleep);log.info("消費者1,休眠{}ms后處理完成信息:{},共處理{}消息,消息tag={}",sleep,JSONObject.toJSONString(message),count++,deliveryTag);


 

或,直接將@RabbitListener注解在方法上


 

可也可以接收一個Message對象


 

最后顯示的效果如下:


 

分享到:
標簽:RabbitMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定