以前我們討論的消費組,都是 group 的形式,group 可以自動地幫助消費者分配分區,且在發生異常時,還能自定地進行重平衡(Rebalance)。正常來說,group 幫助用戶實現自動監聽分區消費,但是在用戶需要指定分區進行精確消費的場景下,由于 group 的重平衡機制,會打破這種消費方式,這不前段時間某項目就有個需求是這樣的:
消息源端有若干個,每個消息源都會產生不同的消息,目標端也有若干個,每個目標端需要消費指定的消息源類型。
在以往,由于消費組的重平衡機制會打亂這種消費方式,只能申請多個主題對消息進行隔離,每個消息源將消息發送到指定主題,目標端監聽指定的主題。這么做肯定沒有指定分區消費這么優雅了,每增加一種消息源,都需要新增一個 topic,且消費粒度不能靈活組合。
針對以上問題,Kafka 的提供了獨立消費者模式,可以消費者可以指定分區進行消費,如果只用一個 topic,每個消息源啟動一個生產者,分別發往不同的分區,消費者指定消費相關的分區即可,用如下圖所示:
但是 Kafka 獨立消費者也有它的限定場景:
1、 Kafka 獨立消費者模式下,Kafka 集群并不會維護消費者的消費偏移量,需要每個消費者維護監聽分區的消費偏移量,因此,獨立消費者模式與 group 模式切勿混合使用!
2、group 模式的重平衡機制在消費者異常時可將其監聽的分區重分配給其它正常的消費者,使得這些分區不會停止被監聽消費,但是獨立消費者由于是手動進行監聽指定分區,因此獨立消費者發生異常時,并不會將其監聽的分區進行重分配,這就會造成某些分區消息堆積。因此,在該模式下,獨立消費者需要實現高可用,例如獨立消費者使用 K8s Deployment 進行部署。
下面將演示如何使用 Kafka#assgin 方法手動訂閱指定分區進行消費:
public static void main(String[] args) {
Properties kafkaProperties = new Properties();
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.Apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProperties.put("bootstrap.servers", "localhost:9092");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProperties);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition("test_topic", 0));
partitions.add(new TopicPartition("test_topic", 1));
consumer.assign(partitions); while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, byte[]> record : records) {
System.out.printf("topic:%s, partition:%s%n", record.topic(), record.partition());
} }}