本文介紹了是否有適用于Spring Boot Kafka客戶端的斷路器?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
如果Kafka服務(wù)器(暫時)關(guān)閉,我的Spring Boot應(yīng)用程序ReactiveKafkaConsumerTemplate
一直嘗試連接不成功,從而造成不必要的流量和日志文件混亂:
2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
是否可以使用斷路器(靈感here或here)之類的東西,以便Spring BootKafka客戶端在出現(xiàn)故障時(或者更好的是連續(xù)幾次故障)放慢嘗試連接的速度,只有在服務(wù)器重新啟動后才會恢復(fù)到正常速度?
是否已有現(xiàn)成的配置參數(shù)或任何其他解決方案?
我知道parameterreconnect.backoff.ms
,這是我創(chuàng)建ReactiveKafkaConsumerTemplate
Bean的方式:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("com.example.myapplication");
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions
.<String, MyEvent>create(map)
.withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
.withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
.subscription(List.of("MyTopic")));
}
消費者仍嘗試每3秒連接一次。
推薦答案
參見https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms
嘗試重新連接到給定主機之前等待的基本時間。這避免了在緊密環(huán)路中重復(fù)連接到主機。此退避適用于客戶端到代理的所有連接嘗試。
和https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms
重新連接到多次連接失敗的代理時等待的最長時間(以毫秒為單位)。如果提供,則每次連續(xù)連接失敗時,每臺主機的退避將呈指數(shù)級增加,最高可達(dá)此最大值。計算退避增加后,增加20%的隨機抖動以避免連接風(fēng)暴。
和
這篇關(guān)于是否有適用于Spring Boot Kafka客戶端的斷路器?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,