本文介紹了春天·卡夫卡聽著regex的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在嘗試使用以下代碼收聽新創(chuàng)建的主題,但不起作用。您能告訴我下面的代碼是否正確嗎?
public class KafkaMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
private final ProcessEventModel eventModel;
@KafkaListener(topicPattern = "betsyncDataTopic*")
public void receive(ConsumerRecord<String, String> consumerRecord) {
LOGGER.info("received payload at '{}'", consumerRecord.timestamp());
eventModel.process(consumerRecord.value());
}
推薦答案
您的正則表達式無效;它應該是betsyncDataTopic.*
。
@KafkaListener(id = "xxx", topicPattern = "kbgh.*")
public void listen(String in) {
System.out.println(in);
}
…
partitions assigned: [kbgh290-0]
編輯
如果稍后添加與模式匹配的新主題,則在重新平衡之前會有延遲。根據(jù)KafkaConsumer
javadoc…
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topic existing at the time of check.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that
* belong to a particular group and will trigger a rebalance operation if one of the
* following events trigger -
* <ul>
* <li>Number of partitions change for any of the subscribed list of topics
* <li>Topic is created or deleted
* <li>An existing member of the consumer group dies
* <li>A new member is added to an existing consumer group via the join API
* </ul>
我剛剛運行了一個測試;在12:13:32
處添加了一個新的匹配主題;結果:
2018-02-12 12:17:30.394 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions revoked: [kbgh290-0]
2018-02-12 12:17:30.450 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [kbgh290-0, kbghNew-0]
所以需要幾分鐘。
這篇關于春天·卡夫卡聽著regex的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,