本文介紹了如何配置Spring Boot Kafka客戶端以使其不嘗試連接的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
這與Is there a "Circuit Breaker" for Spring Boot Kafka client?有關,但我仍然認為這是一個不同的問題:)
我們需要配置Spring Boot Kafka客戶端,以便它根本不會嘗試連接。
用例是,在測試環境中,我們沒有運行Kafka,但我們仍然需要構建完整的Spring Boot上下文,因此使該Bean以配置文件為條件是不起作用的。我們不在乎BEY是否未連接,但我們需要它存在。
問題是不成功的連接嘗試需要大約30-40秒,我們的測試會顯著減慢。
哪種configuration parameters或哪種組合完全禁止連接嘗試,或至少強制客戶端僅嘗試一次?
多次重試連接的代碼為:
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
它反復生成此警告:
WARN ... org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
以下代碼僅嘗試連接一次:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> myConsumer(KafkaProperties properties) {
return createConsumer(properties, "MyTopic", "MyConsumerGroup");
}
public <E> ReactiveKafkaConsumerTemplate<String, E> createConsumer(KafkaProperties properties, String topic, String consumerGroup) {
final Map<String, Object> map = configureKafkaProperties(properties, consumerGroup);
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions.<String, E>create(map)
.subscription(List.of(topic)));
}
制作
WARN 7268 ... org.apache.kafka.clients.NetworkClient : Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
我也嘗試過設置屬性
spring.kafka.admin.fail-fast=true
但這似乎完全沒有效果。
推薦答案
Spring Boot自動配置默認情況下將連接到代理以創建任何NewTopic
Bean。您可以將其autoCreate
屬性設置為False。
/**
* Set to false to suppress auto creation of topics during context initialization.
* @param autoCreate boolean flag to indicate creating topics or not during context initialization
* @see #initialize()
*/
public void setAutoCreate(boolean autoCreate) {
編輯
若要獲取對KafkaAdmin
的引用,只需將其作為參數添加到任何Bean定義。
例如
@Bean
public KafkaAdmin.NewTopics topics(KafkaAdmin admin) {
admin.setAutoCreate(false);
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
另請參閱KafkaAdmin.initialize()
。
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
使用@KafkaListener
設置autoStartup = "false"
以防止使用者在上下文初始化時啟動。
使用Reactive,只是不要訂閱receive*()
方法返回的Flux
(這是觸發消費者創建的內容)。
這篇關于如何配置Spring Boot Kafka客戶端以使其不嘗試連接的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,