先熟悉下面會(huì)用到的一些名詞~
- exchange: 交換機(jī)
- routingkey: 路由key
- queue: 隊(duì)列
exchange和queue是需要綁定在一起的,然后消息發(fā)送到exchange再由exchange通過(guò)routingkey發(fā)送到對(duì)應(yīng)的隊(duì)列中。
exchange分四種
Default Exchange
這種是特殊的Direct Exchange,是rabbitmq內(nèi)部默認(rèn)的一個(gè)交換機(jī)。該交換機(jī)的name是空字符串,所有queue都默認(rèn)binding 到該交換機(jī)上。所有binding到該交換機(jī)上的queue,routing-key都和queue的name一樣。
注意: 這就是為什么你直接創(chuàng)建一個(gè)queue也能正常的生產(chǎn)與消費(fèi),因?yàn)閷?duì)應(yīng)的exchange是RabbitMQ默認(rèn)的,routingkey就是該隊(duì)列的名字
Topic Exchange
通配符交換機(jī),exchange會(huì)把消息發(fā)送到一個(gè)或者多個(gè)滿(mǎn)足通配符規(guī)則的routing-key的queue。其中表號(hào)匹配一個(gè)word,#匹配多個(gè)word和路徑,路徑之間通過(guò).隔開(kāi)。如滿(mǎn)足a..c的routing-key有a.hello.c;滿(mǎn)足#.hello的routing-key有a.b.c.helo。
Fanout Exchange
扇形交換機(jī),該交換機(jī)會(huì)把消息發(fā)送到所有binding到該交換機(jī)上的queue。這種是publisher/subcribe模式。用來(lái)做廣播最好。
所有該exchagne上指定的routing-key都會(huì)被ignore掉。
Header Exchange
設(shè)置header attribute參數(shù)類(lèi)型的交換機(jī)。
簡(jiǎn)單的了解之后,下面就是延遲隊(duì)列的實(shí)現(xiàn)方式
延遲隊(duì)列的實(shí)現(xiàn)
延遲分兩種
- 在msg上設(shè)置過(guò)期時(shí)間
- 在隊(duì)列上設(shè)置過(guò)期時(shí)間
一定要看懂這張圖!!!
如上圖創(chuàng)建三個(gè)exchange和三個(gè)隊(duì)列
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
@Bean
public DirectExchange processExchange() {
return new DirectExchange(PROCESS_EXCHANGE_NAME);
}
@Bean
public DirectExchange delayQueueExchange() {
return new DirectExchange(DELAY_QUEUE_EXCHANGE_NAME);
}
/**
* 存放延遲消息的隊(duì)列 最后將會(huì)轉(zhuǎn)發(fā)給exchange(實(shí)際消費(fèi)隊(duì)列對(duì)應(yīng)的)
* @return
*/
@Bean
Queue delayQueue4Msg(){
return QueueBuilder.durable(DELAY_QUEUE_MSG)
.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", ROUTING_KEY)
.build();
}
@Bean
public Queue processQueue() {
return QueueBuilder.durable(PROCESS_QUEUE)
.build();
}
/**
* 存放消息的延遲隊(duì)列 最后將會(huì)轉(zhuǎn)發(fā)給exchange(實(shí)際消費(fèi)隊(duì)列對(duì)應(yīng)的)
* @return
*/
@Bean
public Queue delayQueue4Queue() {
return QueueBuilder.durable(DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME) // DLX
.withArgument("x-dead-letter-routing-key", ROUTING_KEY)
.withArgument("x-message-ttl", 3000) // 設(shè)置隊(duì)列的過(guò)期時(shí)間 單位毫秒
.build();
}
接下來(lái)將每個(gè)exchange和對(duì)應(yīng)的mq綁定
@Bean
Binding delayBinding() {
return BindingBuilder.bind(delayQueue4Msg())
.to(delayExchange())
.with(ROUTING_KEY);
}
@Bean
Binding queueBinding() {
return BindingBuilder.bind(processQueue())
.to(processExchange())
.with(ROUTING_KEY);
}
@Bean
Binding delayQueueBind() {
return BindingBuilder.bind(delayQueue4Queue())
.to(delayQueueExchange())
.with(ROUTING_KEY);
}
發(fā)送消息的方式
public void sendDelayMsg(Msg msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(msg.getId() + " 延遲消息發(fā)送時(shí)間:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, "delay", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(msg.getTtl() + "");
return message;
}
});
}
public void sendDelayQueue(Msg msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(msg.getId() + " 延遲隊(duì)列消息發(fā)送時(shí)間:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_QUEUE_EXCHANGE_NAME,"delay", msg);
}
驗(yàn)證結(jié)果
為每個(gè)消息設(shè)置過(guò)期時(shí)間
為隊(duì)列設(shè)置過(guò)期時(shí)間
如果你把設(shè)置了過(guò)期時(shí)間的消息發(fā)送到設(shè)置了過(guò)期時(shí)間的隊(duì)里中的時(shí)候,以最短的時(shí)間為準(zhǔn)。
最后
其實(shí)我在實(shí)現(xiàn)的過(guò)程中也花了很長(zhǎng)的時(shí)間,主要就是被exchange和queue搞亂掉了,最后索性自己畫(huà)了個(gè)圖,按照?qǐng)D來(lái)一個(gè)一個(gè)創(chuàng)建與綁定。之后就很清晰很容易的實(shí)現(xiàn)了。
強(qiáng)調(diào)!!! 如果在開(kāi)發(fā)的過(guò)程中發(fā)現(xiàn)exchange和queue綁定錯(cuò)誤了,建議從管理界面將queue和exchange unbind或者刪除重新創(chuàng)建!