延遲隊列是指當消息被發送以后,并不是立即執行,而是等待特定的時間后,消費者才會執行該消息。延遲隊列的使用場景有以下幾種:
-
未按時支付的訂單,30 分鐘過期之后取消訂單。 -
給活躍度比較低的用戶間隔 N 天之后推送消息,提高活躍度。 -
新注冊會員的用戶,等待幾分鐘之后發送歡迎郵件等。
1.如何實現延遲隊列?
延遲隊列有以下兩種實現方式:
-
通過消息過期后進入死信交換器,再由交換器轉發到延遲消費隊列,實現延遲功能; -
使用官方提供的延遲插件實現延遲功能。
早期,大部分公司都會采用第一種方式,而隨著 RabbitMQ 3.5.7(2015 年底發布)的延遲插件的發布,因為其使用更簡單、更方便,所以它現在才是大家普通會采用的,實現延遲隊列的方式,所以本文也只講第二種方式。
2.實現延遲隊列
2.1 安裝并啟動延遲隊列
2.1.1 下載延遲插件
https://Github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
注意:需要根據你自己的 RabbitMQ 服務器端版本選擇相同版本的延遲插件,可以在 RabbitMQ 控制臺查看:
2.1.2 將插件放到插件目錄
接下來,將上一步下載的插件放到 RabbitMQ 服務器安裝目錄,如果是 Docker,使用一下命令復制:
docker cp 宿主機文件 容器名稱或ID:容器目錄
如下圖所示:
之后,進入 docker 容器,查看插件中是否包含延遲隊列:
docker exec -it 容器名稱或ID /bin/bash rabbitmq-plugins list
如下圖所示:
2.1.3 啟動插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如下圖所示:
2.1.4 重啟RabbitMQ服務
安裝完 RabbitMQ 插件之后,需要重啟 RabbitMQ 服務才能生效。如果使用的是 Docker,只需要重啟 Docker 容器即可:
docker restart 容器名稱或ID
如下圖所示:
2.1.5 驗收結果
在 RabbitMQ 控制臺查看,新建交換機時是否有延遲消息選項,如果有就說明延遲消息插件已經正常運行了,如下圖所示:
2.1.6 手動創建延遲交換器(可選)
此步驟可選(非必須),因為某些版本下通過程序創建延遲交換器可能會出錯,如果出錯了,手動創建延遲隊列即可,如下圖所示:
2.2 編寫延遲消息實現代碼
2.2.1 配置交換器和隊列
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
/**
* 延遲交換器和隊列
*/
@Configuration
public class DelayedExchangeConfig {
public static final String EXCHANGE_NAME = "myDelayedExchange";
public static final String QUEUE_NAME = "delayed.queue";
public static final String ROUTING_KEY = "delayed.routing.key";
@Bean
public CustomExchange delayedExchange() {
return new CustomExchange(EXCHANGE_NAME,
"x-delayed-message", // 消息類型
true, // 是否持久化
false); // 是否自動刪除
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.withArgument("x-delayed-type", "direct")
.build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
}
}
2.1.2 定義消息發送方法
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class DelayedMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 5000)
public void sendDelayedMessage(String message) {
rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
DelayedExchangeConfig.ROUTING_KEY,
message,
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(10000); // 設置延遲時間,單位毫秒
return messagePostProcessor;
});
}
}
2.1.3 發送延遲消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMApping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
@Autowired
private DelayedMessageProducer delayedMessageProducer;
@GetMapping("/send")
public String sendDirectMessage(@RequestParam String message) {
delayedMessageProducer.sendDelayedMessage(message);
return "Delayed message sent to Exchange: " + message;
}
}
2.1.4 接收延遲消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DelayedMessageConsumer {
@RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
public void receiveDelayedMessage(String message) {
System.out.println("Received delayed message: " + message);
}
}
PS:獲取本文延遲隊列的實現 Demo,請加我:GG_Stone【備注:延遲隊列】
小結
實現 RabbitMQ 延遲隊列目前主流的實現方式,是采用官方提供的延遲插件來實現。而延遲插件需要先下載插件、然后配置并重啟 RabbitMQ 服務,之后就可以通過編寫代碼的方式實現延遲隊列了。