準備工作純JAVA項目依賴
創建一個maven-java項目在pom.xml中添加以下依賴:
com.RabbitMQ:amqp-client:5.15.0org.projectlombok:lombok:1.18.24junit:junit:4.13.2org.Apache.logging.log4j:log4j-api:2.18.0org.apache.logging.log4j:log4j-core:2.18.0org.apache.logging.log4j:log4j-slf4j-impl:2.18.0org.slf4j:slf4j-api:1.7.36cn.hutool:hutool-core:5.8.5
日志配置文件
創建log4j2.xml文件,用于輸出日志:
連接工具類
- 每一次連接時,地址、端口等信息都相同,所以將這些相同的代碼寫成一個工具類。
- 這兒沒有將Connection聲明成靜態的成員變量,因為,對于生產者與消費者,應該是分開部署的,也不可能使用同一個Connection對象。
package wj.mq.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnUtils {public static Connection newConnection(){ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.56.61");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");Connection connection = null;try {connection = factory.newConnection();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);return connection;
最后的項目結構如下:
SpringBoot項目依賴
配置文件server.port=8888spring.Application.name=helloworld# 配置mqspring.rabbitmq.host=192.168.56.61spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin# 配置日志logging.level.root=INFOlogging.file.name=./logs/log.log
相關注解
@RabbitListener(queues = {"HelloSpring"}) : 可以注解在類上,也可以注解在方法上,消費消息。
@RabbitHandler() : 注解在方法上,與@RabbitListener共同使用。
簡單模式
java項目生產者Publisher代碼package wj.rabbitmq.demo01;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;import java.nio.charset.StandardCharsets;@Slf4jpublic class Publisher {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";//獲取一個連接try(Connection connection = ConnUtils.newConnection()){//獲取信道Channel channel = connection.createChannel();//聲明一個隊列名稱為channel.queueDeclare(queueName, true, false,false, null);//發送消息:Exchange空默認使用/交換機channel.basicPublish("",queueName,null,"HelloWorld".getbytes(StandardCharsets.UTF_8));log.info("送信息完成");
查看UI
發送完成以后,查看RabbitMQ的UI管理界面中的Queue選項卡:
點HelloQueue這個隊列名稱,進入詳細界面:
查看Bindings,可以看到目前是默認交換機(即AMQP Default)綁定到當前這個隊列:
消費者Consumer代碼package wj.rabbitmq.demo01;import com.rabbitmq.client.*;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;@Slf4jpublic class Consumer {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";try (Connection con = ConnUtils.newConnection()) {Channel channel = con.createChannel();channel.queueDeclare(queueName, true, false,false, null);//接收到數據以后的回調函數Delivercallback callback = (consumerTag, message) -> {byte[] body = message.getBody();String str = new String(body);log.info("接收到信息:{}", str);//取消處理信息的回調CancelCallback cancelCallback = consumerTag -> {//ignore//消費監聽channel.basicConsume(queueName, false, callback, cancelCallback);
截圖:
運行后效果,即輸出從隊列中讀取的數據:
09:40:22.886 [pool-2-thread-4] 接收到信息:HelloWorld
SpringBoot項目創建Springboot項目并添加以下依賴
- org.springframework.boot:spring-boot-starter-amqp:2.7.3
- org.springframework.boot:spring-boot-starter-thymeleaf:2.7.3
- org.springframework.boot:Spring-boot-starter-web:2.7.3
- org.springframework.boot:spring-boot-devtools:2.7.3
- org.springframework.boot:spring-boot-configuration-processor:2.7.3
- org.projectlombok:lombok:1.18.24
- org.springframework.boot:spring-boot-starter-test:2.7.3
- org.springframework.amqp:spring-rabbit-test:2.4.6
- cn.hutool:hutool-core:5.8.6
配置類的主要功能是配置Queue,Exchange等。以下我這兒僅需要一個Queue所以僅配置了一個Queue。
package wj.mq.config;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import lombok.extern.slf4j.Slf4j;@Slf4j@Configurationpublic class RabbitMQConfig {* 在使用隊列之前,必須要先聲明這個隊列,如果不在這兒聲明,就必須要在@RabbitListener時添加queuesToDeclare如下:@RabbitListener(queues = {"HelloSpring"},queuesToDeclare = @Queue("HelloSpring"))* @return@Beanpublic Queue helloSpringQueue() {Map args = new HashMap<>();args.put("name", "HelloSpring");Queue queue = new Queue("HelloSpring",true, false, false,args);log.info("隊列創建成功:{}",queue.getName());return queue;
啟動類
與普通的啟動的類類似,只是為了使用調度功能,添加了@EnableScheduling注解。
package wj.mq;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import lombok.extern.slf4j.Slf4j;@Slf4j@Controller@EnableScheduling@SpringBootApplicationpublic class HelloWorldApplication {public static void main(String[] args) {SpringApplication.run(HelloWorldApplication.class, args);log.info("Started..");@ResponseBody@RequestMapping(value = {"/",""})public String index() {return "index";
添加EnableScheduling注解,后面用到定時任務,讓生產者定時發布消息:
生產者
convertAndSend可以發布任意的對象,默認會被轉換成指定的格式
package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class HelloWorldSender {@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {String msg = ""+times;rabbitTemplate.convertAndSend("HelloSpring",msg);log.info("Send {} ok",msg);times++;
添加@Component注解為Spring Bean組件。
注入rabbitTemplate。
通過convertAndSend(QueueName,Object)發送數據給批定的隊列。
消費者package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {@RabbitHandlerpublic void onMessage(String message) {log.info("消費者1,接收到信息: {}",message);
在類上添加注解@RabbitListener(queues=..)
在接收信息的方法上添加@RabbitHandler
消費者2(另一種使用注解的方法)
package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;* 第二種接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {* 并設置為手工確認@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(Message message,Channel channel) throws Exception {log.info("消費者2,接收到信息: {}",new String(message.getBody()));long id = message.getMessageProperties().getDeliveryTag();//手工確認channel.basicAck(id, false);
直接將注解寫在方法上,并設置為手工確認。
注意接收的參數為Message對象和Channel對象。
使用channel.ack進行手工確認。
生產者-發布一個對象聲明JavaBean必須實現序列化接口package wj.mq.domain;import java.io.Serializable;import lombok.Getter;import lombok.Setter;@Getter@Setterpublic class Person implements Serializable {private static final long serialVersionUID = 1L;private String name;private Integer age;生產者
注意以下通過convertAndSend直接發布了一個對象。
package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import wj.mq.domain.Person;@Componentpublic class HelloWorldSender {@Autowiredprivate Queue helloSpringQueue;@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {Person person = new Person();//聲明對象并直接發布person.setName("Jack");person.setAge(times);rabbitTemplate.convertAndSend(helloSpringQueue.getName(),person);times++;
查看RabbitMQ UI可見編碼類型為序列化再轉base64的方式:
消費者-消費一個對象直接接收消費對象package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.alibaba.fastJSON2.JSONObject;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson對象即可@RabbitHandler()public void onMessage(Person message) {int sleep = 1000*random.nextInt(20);ThreadUtil.sleep(sleep);log.info("消費者1,休眠{}ms后處理完成信息:{},共處理{}消息",sleep,JSONObject.toJSONString(message),count++);
接收Message對象,然后自己做轉換package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;* 第二種接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {int count = 1;* 并設置為手工確認@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(@Payload()Message message,Channel channel) throws Exception {int sleep = 0;//通過ObjectReader讀取對象ObjectInputStream objIn = new ObjectInputStream(new ByteArrayInputStream(message.getBody()));Object readObject = objIn.readObject();Person p = (Person) readObject;objIn.close();log.info("消費者2,休眠{}ms后處理完成信息: {},共處理{}消息",sleep,JSONObject.toJSONString(p),count++);long id = message.getMessageProperties().getDeliveryTag();channel.basicAck(id, false);//手工確認
最后顯示的結果,與之前相同
最終項目結構
運行測試
兩個消費者,依次接收RabbitMQ發送的消息:
關于springboot中的更多設置設置MessageConvert
默認情況下,通過rabbitTemplate.sendAndConvert(..)發送的object對象會使用Java序列化方式傳輸。可以在UI界面上,通過讀取一個Queue中的消息的方式,查看隊列中的數據格式:java-serialized。
我們可以修改這種格式,如JSON
以下注意給RabbitTemplate.setMessageConverter(..)即可以設置為JSON轉換格式。
package wj.mq.config.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class HelloWorldConfig {@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic Queue helloSpringQueue() {return new Queue("HelloSpringQueue", true, false,false);@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rt =new RabbitTemplate();rt.setConnectionFactory(connectionFactory);rt.setMessageConverter(messageConverter());return rt;
設置后,查看Message的格式
接收 deliverTag
僅通過@RabbitListener的方法上,可以通過添加@Header接收具體指定的數據,如下。
注意@header的部分的代碼。
package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"#{helloSpringQueue.name}"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson對象即可@RabbitHandler()public void onMessage(@Payload() Person message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) {int sleep = 1000*random.nextInt(20)*0;ThreadUtil.sleep(sleep);log.info("消費者1,休眠{}ms后處理完成信息:{},共處理{}消息,消息tag={}",sleep,JSONObject.toJSONString(message),count++,deliveryTag);
或,直接將@RabbitListener注解在方法上
可也可以接收一個Message對象
最后顯示的效果如下: