本文介紹了春云Kafka StreamsUncaughtExceptionHandler的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我正在嘗試將StreamsUncaughtExceptionHandler添加到我的Kafka流處理器中。該處理器是用Kafka函數(shù)編寫的。我查看了suggestion provided by Artem Bilan以將StreamsUncaughtExceptionHandler包括到我的服務(wù)中,但我的異常從未被它捕獲/處理。
配置Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
自定義異常處理程序:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
流處理函數(shù):
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
我希望UnCaughtExceptionHandler處理由service.process()方法引發(fā)的任何異常。但是異常永遠(yuǎn)不會進(jìn)入Handle方法;相反,它們傳播到根并殺死客戶端。我也看過this solution,但我想以更獨(dú)立的方式處理它。
問題:如何使用StreamsUncaughtExceptionHandler處理任何處理異常?
Spring Boot版本:2.6.3
春云溪流版本:3.2.1
Spring-Cloud-Stream-Binder-Kafka-Streams:3.2.1
Kafka-Streams:3.0.0
可復(fù)制示例:spring-cloud-kafka-streams-exception
推薦答案
以下是您可以嘗試的幾種方法。
嘗試在StreamsBuilderFactoryBean
中的this line處設(shè)置斷點(diǎn),并查看配置的值是什么。這應(yīng)該會給出一些線索。
我注意到您在配置的Impl中為訂單設(shè)置了Integer.MAX_VALUE
。默認(rèn)情況下,StreamsBuilderFactoryBean
使用階段值Integer.MAX_VALUE - 1000
,因此在工廠Bean準(zhǔn)備啟動時,配置器可能還不可用,因?yàn)?code>Integer.MAX_VALUE的優(yōu)先級較低。您可以將訂單更改為類似Integer.MAX_VALUE - 5000
的內(nèi)容,以確保在啟動工廠Bean之前完全實(shí)例化配置Bean。
從這些選項(xiàng)開始,查看它們是否為該問題提供了任何跡象。如果它仍然存在,請隨時與我們分享一個可重復(fù)使用的小示例應(yīng)用程序。
這篇關(guān)于春云Kafka StreamsUncaughtExceptionHandler的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,