本文介紹了如何使用Quarkus在Kafka中設置同一主題中的多個消費者的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在使用Quarkus框架構建一個Kafka消費者,它將讀取帶有3個分區的主題。下面的代碼片段正在工作,但根據日志,我只是啟動了具有3個分區的1個使用者。我現在的問題是,一旦我運行我的應用程序,我如何才能產生3個消費者。
@Incoming("topic-1")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) throws IOException {
LOG.info("Kafka order message with value = {} arrived from topic {} ", message.getPayload(),
message.getTopic());
//JsonObject event = new JsonObject(message.getPayload());
try {
if (true) {
LOG.info("Kafka message: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
return message.ack();
}
請參閱示例日志:
信息[org.apa.kaf.cli.con.int.Consumer協調器](vert.x-kafka-Consumer-線程-0)[消費者客戶端ID=測試消費者,組ID=kafka-檢測-消費者]已完成第64代的組的分配:{testconsumer-bf6d314c-44e1-47b1-9439-fe4058951841=Assignment(partitions=[test_part-0,TEST_PART-1,TEST_PART-2])}
推薦答案
如果您在Containers平臺(Docker、K8s…)上運行應用程序然后,您可以橫向擴展您的服務;否則,請使用不同的端口再次運行您的應用程序。
Kafka客戶端啟動時會被分配到某個分區,因此同一個客戶端不能從多個Theme-Partition消費。
這篇關于如何使用Quarkus在Kafka中設置同一主題中的多個消費者的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,