本文介紹了如何使用Spring Kafka實現有狀態消息監聽器?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我希望使用Spring Kafka API實現有狀態監聽器。
提供以下信息:
ConCurrentKafkaListenerContainerFactory,并發設置為”n”
Spring@Service類上的@KafkaListener批注方法
然后創建”n”個KafkaMessageListenerContainers。它們中的每一個都將有自己的KafkaConsumer,因此將有”n”個使用者線程-每個使用者一個線程。
消費消息時,將使用輪詢底層KafkaConsumer的同一線程調用@KafkaListener方法。由于只有一個監聽程序實例,因此此監聽程序需要是線程安全的,因為將有來自”n”個線程的并發訪問。
我不想考慮并發訪問,并在我知道只能由一個線程訪問的偵聽器中保留狀態。
如何使用Spring Kafka API為每個Kafka消費者創建單獨的監聽器?
推薦答案
您說得對;每個容器都有一個監聽器實例(無論是配置為@KafkaListener
還是MessageListener
)。
一種解決方法是將作用域為MessageListener
的原型與n個KafkaMessageListenerContainer
Bean(每個Bean有一個線程)一起使用。
然后,每個容器將獲得其自己的偵聽器實例。
@KafkaListener
POJO抽象不可能做到這一點。
不過,使用無狀態Bean通常更好。
編輯
我找到了另一個解決方法,使用SimpleThreadScope
…
@SpringBootApplication
public class So51658210Application {
public static void main(String[] args) {
SpringApplication.run(So51658210Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
};
}
@Bean
public ActualListener actualListener() {
return new ActualListener();
}
@Bean
@Scope("threadScope")
public ThreadScopedListener listener() {
return new ThreadScopedListener();
}
@Bean
public static CustomScopeConfigurer scoper() {
CustomScopeConfigurer configurer = new CustomScopeConfigurer();
configurer.addScope("threadScope", new SimpleThreadScope());
return configurer;
}
@Bean
public NewTopic topic() {
return new NewTopic("so51658210", 3, (short) 1);
}
public static class ActualListener {
@Autowired
private ObjectFactory<ThreadScopedListener> listener;
@KafkaListener(id = "foo", topics = "so51658210")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
this.listener.getObject().doListen(in, partition);
}
}
public static class ThreadScopedListener {
private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + ":"
+ Thread.currentThread().getName() + ":"
+ this.hashCode() + ":"
+ partition);
}
}
}
(容器并發數為3)。
工作正常:
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
唯一的問題是作用域不會自動清理(例如,當容器停止并且線程離開時)。這可能并不重要,具體取決于您的用例。
要解決這個問題,我們需要來自容器的一些幫助(例如,在偵聽器線程停止時在其上發布一個事件)。GH-762。
這篇關于如何使用Spring Kafka實現有狀態消息監聽器?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,