channel.basicQos(0, 1, false):0表示對(duì)消息的大小無(wú)限制,1表示每次只允許消費(fèi)一條,false表示該限制不作用于channel。同時(shí),我們采用手工ACK的方式,因?yàn)槲覀兣渲梦募渲昧?spring.rabbitmq.listener.simple.acknowledge-mode=manual。
一、發(fā)送消息的幾種方式
1.1、默認(rèn)交換機(jī)和routingKey----(個(gè)人不推薦使用)
使用默認(rèn)的交換機(jī)exchange或routingKey。
圖片
調(diào)用方法:
圖片
1.2、使用指定routingKey的方式發(fā)送(默認(rèn)的交換機(jī))
使用默認(rèn)的交換機(jī),routingKey必須為quenue隊(duì)列的名稱(chēng)。
調(diào)用方法:
圖片
案例:
/**
* @Author yangyalin
* @Description 測(cè)試發(fā)送消息(直接使用隊(duì)列發(fā)送,使用默認(rèn)的交換機(jī)) routingKey:即為對(duì)列的名稱(chēng)即可
**/
public void testSendMsg(String message){
rabbitTemplate.convertAndSend(RabbitMQConvertConfig.TEST_QUEUE,message);
}
1.3、指定交換機(jī)和routingKey的方式發(fā)送
使用指定的交換機(jī),若綁定routingKey,必須使用指定的模式;若沒(méi)有綁定,可設(shè)置為""。
調(diào)用方法:
圖片
案例:
public void sendDecreStockMessage(DecreStockFromRabbit decreStockFromRabbit){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(decreStockFromRabbit.getMessageId());
/**
* exchange:交換機(jī) routingKey:路由鍵 message:消息體內(nèi)容 correlationData:消息唯一ID
**/
rabbitTemplate.convertAndSend(RabbitMQConvertConfig.ORDER_EXCHANGE,
RabbitMQConvertConfig.ORDER_ROUTINGKEY, decreStockFromRabbit,correlationData);
}
或:
rabbitTemplate.convertAndSend("test-exchange","",message);
二、接收消息的幾種方式
2.1、默認(rèn)交換機(jī),提前創(chuàng)建好隊(duì)列(TestDirectQueue)
/**
* 功能描述:當(dāng)消費(fèi)同一個(gè)隊(duì)列的時(shí)候,可通過(guò)設(shè)置實(shí)現(xiàn)能則多勞,
* 消息輪詢(xún)方式訂閱
* @MethodName: process11
* @MethodParam: [testMessage]
* @Return: void
* @Author: yyalin
* @CreateDate: 2022/4/9 17:10
*/
@RabbitListener(queues = "TestDirectQueue") //監(jiān)聽(tīng)的隊(duì)列名稱(chēng) TestDirectQueue
public void process11(Map testMessage) throws InterruptedException {
log.info("消費(fèi)者收到消息222:" + testMessage.toString());
Thread.sleep(200);
}
2.2、默認(rèn)交換機(jī),自動(dòng)創(chuàng)建隊(duì)列(TEST_QUEUE2)
@RabbitListener(queuesToDeclare=@Queue(TopicExchangeConfig.TEST_QUEUE2))
@RabbitHandler
public void receiveTestMsg2(@Payload String str) throws Exception{
log.info("開(kāi)始接收消息。。。。。");
log.info("接收到的消息:"+str);
}
2.3、自動(dòng)創(chuàng)建交換機(jī)和隊(duì)列----(個(gè)人推薦)
自動(dòng)創(chuàng)建且交換機(jī)和隊(duì)列綁定,key可指定也可不指定(默認(rèn)為隊(duì)列名稱(chēng))。
/******************方案二:使用注解的方式綁定隊(duì)列在交換機(jī)上*******************/
@RabbitListener(bindings = @QueueBinding(value=@Queue(name="directQueue"),
exchange=@Exchange(name="directExchange",type = ExchangeTypes.DIRECT),
key={"red", "blue"}
)) //監(jiān)聽(tīng)的隊(duì)列名稱(chēng) TestDirectQueue
public void directConsumer(String message) {
log.info("消費(fèi)者收到direct消息555 : " + message);
}
@RabbitListener(bindings = @QueueBinding(value=@Queue(name="topicQueue2"),
exchange=@Exchange(name="topicExchange",type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"),
key="#.new"
))
public void topicConsumer2(String message) {
log.info("消費(fèi)者收到topic消息888 : " + message);
}
備注:ignoreDeclarationExceptions = "true" : 即使配置出現(xiàn)了錯(cuò)誤也不至于整個(gè)應(yīng)用程序都啟動(dòng)失敗的情況。
1、channel.basicQos(0, 1, false):0表示對(duì)消息的大小無(wú)限制,1表示每次只允許消費(fèi)一條,false表示該限制不作用于channel。
同時(shí),我們采用手工ACK的方式,因?yàn)槲覀兣渲梦募渲昧? spring.rabbitmq.listener.simple.acknowledge-mode=manual:
2、channel.basicAck(deliveryTag, false):deliveryTag表示處理的消息條數(shù)(一般為1),從heaers中取,false表示不批量ack。
/**
* 功能描述: 消費(fèi)端加上手動(dòng)確認(rèn)消息被接收
* @MethodName: process
* @MethodParam: [message]
* @Return: void
* @Author: yyalin
* @CreateDate: 2022/4/18 19:10
*/
@RabbitListener(queues = "TestDirectQueue3") //監(jiān)聽(tīng)的隊(duì)列名稱(chēng) TestDirectQueue
public void process(String message, Channel channel) throws IOException {
log.info("DirectReceiver消費(fèi)者收到消息1 : " + message);
long msgId=1111L; //消息ID
try {
//手動(dòng)確認(rèn)消息已消費(fèi)
channel.basicAck(msgId,false);
} catch (IOException e) {
//把消息失敗的消息重新放入到隊(duì)列
channel.basicNack(msgId,false,true);
e.printStackTrace();
}
}