本文介紹了Google PubSub重新發送的消息未被處理的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我使用了Google PubSub文檔中的訂閱者示例
我所做的唯一修改是注釋掉了對消息的確認。
訂閱者不再向隊列中添加消息,而應根據Google云控制臺中設置的間隔重新發送消息。
為什么會發生這種情況,還是我遺漏了什么?
public class SubscriberExample {
use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
static class MessageReceiverExample implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
messages.offer(message);
//consumer.ack();
}
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
// set subscriber id, eg. my-sub
String subscriptionId = args[0];
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
PROJECT_ID, subscriptionId);
Subscriber subscriber = null;
try {
// create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
subscriber.startAsync().awaitRunning();
// Continue to listen to messages
while (true) {
PubsubMessage message = messages.take();
System.out.println("Message Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
}
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}
JAVA
當您不確認消息時,推薦答案客戶端庫將對該消息調用modifyAckDeadline,直到maxAckExtensionPeriod傳遞。默認情況下,此值為一小時。因此,如果您不對消息進行確認/取消確認或更改此值,則消息很可能在一小時內不會重新傳遞。如果要更改最大確認擴展期限,請在構建器上進行設置:
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
.setMaxAckExtensionPeriod(Duration.ofSeconds(60))
.build();
還值得注意的是,如果不對消息進行ACK或NACK,則flow control可能會阻止更多消息的傳遞。默認情況下,Java客戶端庫允許最多1000條消息未完成,即等待ACK或NACK或等待最大ACK擴展期限過去。
這篇關于Google PubSub重新發送的消息未被處理的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,