概述
Kafka的強大功能之一是每個分區都有一個Consumer的偏移值。該偏移值是消費者將讀取的下一條消息的值。可以自動或手動增加該值。如果我們由于錯誤而無法處理消息并想重試,我們可以選擇手動管理,并在成功的情況下增加偏移量。但是,這會暫時阻止隊列消息的處理。我們可以選擇異步方法。
為什么我們需要它?
如果發生錯誤,而不是停止隊列消息的處理;我們可以將錯誤消息轉移到不同的主題并再次處理。
如果在處理 Kafka 消息時出現錯誤,可以使用 RetryableTopic 注解以一定的時間間隔和一定的次數再次處理消息。如果完成嘗試次數后錯誤仍然存在,則消息將發送到 DLT 隊列。
如何使用?
我們首先回顧一下RetryableTopic注解可以取的一些值,以便您可以做出最適合您的設置:
attempts:嘗試處理消息的次數。它的默認值為 3。如果完成所有嘗試后仍然收到錯誤,則消息將發送到 DLT 隊列。
backoff:用于確定處理消息的時間間隔。從 Backoff 類獲取一個值。您可以在下面找到退避的詳細示例。
排除/排除名稱:允許您排除指定的異常類。當您添加到列表中的任何錯誤被拋出時,重試機制將不會被激活。
include / includeNames:僅當拋出指定的異常時才會激活重試機制。
kafkaTemplate:雖然您可以給出現有 kafkaTemplate bean 的名稱,但您也可以為特定于重試的 Kafka 模板定義不同的 bean。
autoCreateTopics:決定是否自動創建Retry和DLT主題。
retryTopicSuffix / dltTopicSuffix:用于確定要添加到自動創建的主題末尾的后綴。
dltStrategy:如果不需要DLT,可以定義為NO_DLT。
SameIntervalTopicReuseStrategy/fixedDelayTopicStrategy(3.0.4之前):用于確定要創建的重試主題策略。創建 (SINGLE_TOPIC) 或盡可能多的嘗試值 (MULTIPLE_TOPICS) 重試主題。
Backoff的示例:
- 具有固定的增量值
Backoff(delay = 600000 ) // 每 10 分鐘
- 具有指數價值
Backoff(delay = 60000 , multiplier = 2 ) // 1、2、4、8... 分鐘后重復。
- 用占位符定義值
Backoff(delayExpression = "${delay}", multiplierExpression = "${multiplier}")
@RetryableTopic 示例:
@RetryableTopic(
backoff = @Backoff(delay = 300000),
attempts = 12,
sameIntervalTopicReuseStrategy =
SameIntervalTopicReuseStrategy.SINGLE_TOPIC,
kafkaTemplate = "kafkaRetryableTopicTemplate",
exclude = { SerializationException.class,
DeserializationException.class,
NullPointerException.class
}
)
@KafkaListener(topics = "my-topic")
public void processMessage(RetryableDto retryableDto) {
log.info("Retrying process RetryableDto : {}", retryableDto);
// process message
}
在上面的例子中,消息將每5分鐘重新處理一次,總共12次,即1小時。如果任何嘗試均順利完成,則試用將終止。
由于定義了 SINGLE_TOPIC,因此將創建單個主題以進行重試。如果沒有進行此定義,則會創建 12 個重試主題。
如果拋出了排除中定義的任何錯誤,則不會執行重做。
如果需要,您可以編寫自己的 RetryableException 并在包含中定義此值,以便僅在引發此錯誤時才重試。
DLT隊列處理
如果完成了定義的嘗試次數并且繼續收到錯誤,則消息將發送到 DLT 隊列。如果要處理這些消息,可以使用DltHandler注解。
用法示例:
@DltHandler
public void handleDltMessage (RetryableDto retryableDto) {
log.error("DLT處理程序消息:{}", retryableDto);
}
注意事項
雖然使用 RetryableTopic 的異步處理優勢為我們帶來了性能提升,但這種使用也有一些缺點。
使用RetryableTopic可能會破壞消息的處理順序。
讓我們用一個例子來解釋這種情況:當主主題在時間 t 處理時,一條消息出錯并被發送到重試主題。在時間 t + 1 時,另一條消息來到主主題并成功處理。讓我們在重試主題中的消息在時間 t + 2 時被成功處理。在這種情況下,第一條傳入消息將在第二條消息之后處理。如果訂購對您很重要,我建議您在消息處理過程中進行必要的檢查。
另一個缺點是消息雙重處理的風險。您可以通過考慮這種可能性來進行改進。