本文介紹了如何將@Transaction與@KafkaListener一起使用?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
有沒有可能將聲明性TX管理(通過@Transaction)與@KafkaListener注釋方法一起使用?
例如,我想使用它來為每個監聽器定義單獨的發送超時。
我的設置如下:
TransactionManager:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
KafkaListener:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
問題是-KafkaMessageListenerContainer在調用此類方法之前創建自己的事務-它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
未使用TransactionInterceptor。那么具體的@KafkaListener方法如何設置具體的TX超時時間呢?
推薦答案
可以這樣做,但有點復雜,因為您必須將消耗的偏移量發送到Kafka交易。
不使用ChainedKafkaTransactionManager
,您可以為容器使用KafkaTransactionManager
,為HibernateTransactionManager
使用@Transactional
。
這將產生類似的結果,因為Hibernate Tx將在Kafka事務之前提交(如果Hibernate提交失敗,則Kafka Tx將回滾)。
編輯
若要將不同的鏈式TM配置到每個偵聽器容器中,可以執行以下操作。
@組件
類ContainerFactoryCustomizer{
ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
ChainedKafkaTransactionManager<?, ?> chainedOne,
ChainedKafkaTransactionManager<?, ?> chainedTwo) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().setTransactionManager(chainedOne);
}
else {
container.getContainerProperties().setTransactionManager(chainedTwo);
}
});
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
這篇關于如何將@Transaction與@KafkaListener一起使用?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,