本文介紹了KafkaConsumer Assignment()返回空的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
以下代碼返回非空分配的分區,但poll(0)
已棄用。
val records = kafkaConsumer.poll(0) // <= deprecated
logInfo(s"Dummy call ${records.count()}")
val partitions = kafkaConsumer.assignment()
logInfo(s"partitions=${partitions}")
以下返回空分區:
val records = kafkaConsumer.poll(Duration.ofMillis(0)) // <= not working
logInfo(s"Dummy call ${records.count()}")
val partitions = kafkaConsumer.assignment()
logInfo(s"partitions=${partitions}")
為什么?有什么主意嗎?謝謝
推薦答案
這兩個調用的不同之處在于獲取元數據的方式。被棄用的poll
會無限期地等待,直到成功檢索元數據,而另一個poll
只嘗試一次,通常在非常短的時間間隔內(對于您的情況是0)無法連接到協調器,并返回任何有用的東西。這就是為什么您在調用poll(Duration.ofMillis(0))
一次后會看到一個空的賦值。
這篇關于KafkaConsumer Assignment()返回空的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,