本文介紹了如何監(jiān)聽動(dòng)態(tài)創(chuàng)建的隊(duì)列?的處理方法,對(duì)大家解決問題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我有一個(gè)rabbitListener,它可以持續(xù)地異步監(jiān)聽隊(duì)列”user-Messages”的用戶消息。除非隊(duì)列中加載了批量消息,否則一切正常。當(dāng)消息批量發(fā)布到隊(duì)列時(shí),同一用戶的消息會(huì)先處理,其他用戶的消息則會(huì)等待處理。
我無法使用優(yōu)先級(jí)隊(duì)列,因?yàn)樗杏脩舻膬?yōu)先級(jí)都相等。因此,我希望創(chuàng)建新隊(duì)列并在運(yùn)行時(shí)監(jiān)聽它們。一旦消息被使用,所有隊(duì)列都將是短暫的。(該隊(duì)列將被刪除)
瀏覽時(shí),我發(fā)現(xiàn)可以使用RabbitAdmin動(dòng)態(tài)創(chuàng)建隊(duì)列。但問題是
-
如何使偵聽器偵聽在運(yùn)行時(shí)創(chuàng)建的新的短期(TTL)隊(duì)列?
如何使偵聽器停止偵聽已刪除的隊(duì)列(在TTL時(shí)間之后)以避免異常?
我目前使用的是SimpleMessageListenerContainerFactory。我也可以使用DirectMessageListenerContainer。我唯一關(guān)心的是如何與監(jiān)聽器溝通動(dòng)態(tài)隊(duì)列的創(chuàng)建和刪除。正在考慮https://www.rabbitmq.com/event-exchange.html(事件交換插件)。
Spring-AMQP是否支持啟動(dòng)/停止偵聽動(dòng)態(tài)隊(duì)列。提前謝謝。
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(config.getConnectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(3);
return factory;
}
@RabbitListener(id = "listener", queues = {
"#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
MessageHeaders headers) {
//process message
}
[1]: https://www.rabbitmq.com/event-exchange.html
推薦答案
這個(gè)老頭似乎就是這么做的
鏈接中的代碼:
RabbitMQ配置
@Configuration
public class RabbitMqConfiguration implements RabbitListenerConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(1);
factory.setConsecutiveActiveTrigger(1);
factory.setConsecutiveIdleTrigger(1);
factory.setConnectionFactory(connectionFactory);
registrar.setContainerFactory(factory);
registrar.setEndpointRegistry(rabbitListenerEndpointRegistry());
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
}
界面
public interface RabbitQueueService {
void addNewQueue(String queueName,String exchangeName,String routingKey);
void addQueueToListener(String listenerId,String queueName);
void removeQueueFromListener(String listenerId,String queueName);
Boolean checkQueueExistOnListener(String listenerId,String queueName);
}
服務(wù)
@Service
@Log4j2
public class RabbitQueueServiceImpl implements RabbitQueueService {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Override
public void addNewQueue(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName, true, false, false);
Binding binding = new Binding(
queueName,
Binding.DestinationType.QUEUE,
exchangeName,
routingKey,
null
);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
this.addQueueToListener(exchangeName,queueName);
}
@Override
public void addQueueToListener(String listenerId, String queueName) {
log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
if (!checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
log.info("queue ");
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public void removeQueueFromListener(String listenerId, String queueName) {
log.info("removing queue : " + queueName + " from listener : " + listenerId);
if (checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
log.info("deleting queue from rabbit management");
this.rabbitAdmin.deleteQueue(queueName);
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
try {
log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
log.info("getting queueNames");
String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
log.info("queueNames : " + new Gson().toJson(queueNames));
if (queueNames != null) {
log.info("checking " + queueName + " exist on active queues");
for (String name : queueNames) {
log.info("name : " + name + " with checking name : " + queueName);
if (name.equals(queueName)) {
log.info("queue name exist on listener, returning true");
return Boolean.TRUE;
}
}
return Boolean.FALSE;
} else {
log.info("there is no queue exist on listener");
return Boolean.FALSE;
}
} catch (Exception e) {
log.error("Error on checking queue exist on listener");
log.error("error message : " + ExceptionUtils.getMessage(e));
log.error("trace : " + ExceptionUtils.getStackTrace(e));
return Boolean.FALSE;
}
}
private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
log.info("getting message listener container by id : " + listenerId);
return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
.getListenerContainer(listenerId)
);
}
}
這篇關(guān)于如何監(jiān)聽動(dòng)態(tài)創(chuàng)建的隊(duì)列?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,