背景
普通消息隊列,消息一旦入隊就會被消費者立刻消費,而延遲隊列則需要指定固定時間后被延遲消費.
Docker安裝rabbitMq延遲隊列插件版本
學習的話可以去官網下載rabbitmq并通過安裝插件的方式安裝延遲隊列 插件地址:
https://www.rabbitmq.com/community-plugins.html
但我們這里使用現成的鏡像就可以簡單學習
docker鏡像下載,使用docker-compose編排運行
docker pull lianlianyi/rabbitmq
rabbitmq:
image: lianlianyi/rabbitmq:3.9.13-management-alpine-delayed
ports:
- 15672:15672
- 5672:5672
environment:
- TIME_ZONE=Asia/Shanghai
#用戶
- RABBITMQ_DEFAULT_USER=admin
#密碼
- RABBITMQ_DEFAULT_PASS=admin
volumes:
- /etc/localtime:/etc/localtime
- /etc/timezone:/etc/timezone
- ./data/rabbitmq/db:/var/lib/rabbitmq/mnesia/rabbit@my-rabbit
啟動驗證
交換機有出現 x-delayed-message 字樣表示插件運行成功
springboot 對接rabbitmq隊列
- 添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.Application.xml
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
publisher-returns: true #開啟發送失敗返回
publisher-confirm-type: correlated #配置確認回調
listener:
simple:
acknowledge-mode: auto #開啟ack
concurrency: 5 #指定最小的消費者數量.
max-concurrency: 10 #指定最大的消費者數量.
prefetch: 1 # 最多一次消費多少條數據 -限流
direct:
acknowledge-mode: auto
#支持消息的確認與返回
template:
mandatory: true
生產者偽代碼
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, correlationDate -> {
//延遲3秒
correlationDate.getMessageProperties().setDelay(1000 * 3);
return correlationDate;
});
log.info("生產者發送時間:{},msg:{}", DateUtil.formatDateTime(new Date()),msg);
消費者偽代碼
@RabbitListener(queuesToDeclare = @Queue(MsgProduct.DELAYED_QUEUE_NAME))
public void receive(@Payload MsgDemo msgDemo, Message message, Channel channel) {
log.info("調用消費者時間:{},MSG:{}¨", DateUtil.formatDateTime(new Date()),msgDemo);
}
本文涉及到的demo
https://github.com/lianlianyi/toutiao_demo