本文介紹了批量記錄處理后如何提交卡夫卡抵銷的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在使用spring-kafka
并使用Kafka主題中的批量記錄,并提交偏移量AbstractMessageListenerContainer.AckMode.BATCH
。
在我的例子中,處理批處理記錄需要時間(大約20秒),而使用者線程等待批處理過程完成,然后再次執(zhí)行輪詢(在這次輪詢時提交偏移量)。在本例中,我將List
記錄分配給一個線程(名稱:ProcessThread
),該線程將處理所有記錄并將結果返回給使用者線程,然后使用者線程將記錄結果。(在此過程中,使用者線程將一直等待,直到它從ProcessThread
獲得結果,這會導致性能低下。
ProcessThread
有沒有辦法向Kafka提交Offset?因此使用者線程不需要等待,它將為每個輪詢創(chuàng)建一個新processThread
在我的例子中,我的主題有20個分區(qū)和10個Pod,每個Pod有2個消費者線程(Spring Kafka并發(fā)消費者),每個輪詢100條記錄(使用Spring Boot線程處理所有這些記錄@Async
)
通過上面的配置,我可以在2小時內(nèi)處理100萬條記錄,我需要將其拖到至少40分鐘。