本文介紹了春云Kafka StreamsUncaughtExceptionHandler的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在嘗試將StreamsUncaughtExceptionHandler添加到我的Kafka流處理器中。該處理器是用Kafka函數編寫的。我查看了suggestion provided by Artem Bilan以將StreamsUncaughtExceptionHandler包括到我的服務中,但我的異常從未被它捕獲/處理。
配置Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
自定義異常處理程序:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
流處理函數:
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
我希望UnCaughtExceptionHandler處理由service.process()方法引發的任何異常。但是異常永遠不會進入Handle方法;相反,它們傳播到根并殺死客戶端。我也看過this solution,但我想以更獨立的方式處理它。
問題:如何使用StreamsUncaughtExceptionHandler處理任何處理異常?
Spring Boot版本:2.6.3
春云溪流版本:3.2.1
Spring-Cloud-Stream-Binder-Kafka-Streams:3.2.1
Kafka-Streams:3.0.0
可復制示例:spring-cloud-kafka-streams-exception
推薦答案
以下是您可以嘗試的幾種方法。
嘗試在StreamsBuilderFactoryBean
中的this line處設置斷點,并查看配置的值是什么。這應該會給出一些線索。
我注意到您在配置的Impl中為訂單設置了Integer.MAX_VALUE
。默認情況下,StreamsBuilderFactoryBean
使用階段值Integer.MAX_VALUE - 1000
,因此在工廠Bean準備啟動時,配置器可能還不可用,因為Integer.MAX_VALUE
的優先級較低。您可以將訂單更改為類似Integer.MAX_VALUE - 5000
的內容,以確保在啟動工廠Bean之前完全實例化配置Bean。
從這些選項開始,查看它們是否為該問題提供了任何跡象。如果它仍然存在,請隨時與我們分享一個可重復使用的小示例應用程序。
這篇關于春云Kafka StreamsUncaughtExceptionHandler的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,