
如上圖所示,kafaka集群的 broker,和 Consumer 都需要連接 Zookeeper。 Producer 直接連接 Broker。
Producer 把數(shù)據(jù)上傳到 Broker,Producer可以指定數(shù)據(jù)有幾個(gè)分區(qū)、幾個(gè)備份。上面的圖中,數(shù)據(jù)有兩個(gè)分區(qū) 0、1,每個(gè)分區(qū)都有自己的副本:0'、 1'。
黃色的分區(qū)為 leader,白色的為 follower。
leader 處理 partition 的所有讀寫(xiě)請(qǐng)求,與此同時(shí),follower會(huì)被動(dòng)定期地去復(fù)制leader上的數(shù)據(jù)。 如下圖所示,紅色的為 leader,綠色的為 follower,leader復(fù)制自己到其他 Broker 中:

如果leader發(fā)生故障或掛掉,一個(gè)新leader被選舉并接收客戶端的消息。Kafka確保從同步副本列表中選舉一個(gè)副本為 leader。
關(guān)于follower 的同步機(jī)制可參考:
https://blog.csdn.net/lizhitao/article/details/51718185
Topic 分區(qū)被放在不同的 Broker 中,保證 Producer 和 Consumer 錯(cuò)開(kāi)訪問(wèn) Broker,避免訪問(wèn)單個(gè) Broker造成過(guò)度的IO壓力,使得負(fù)載均衡。
Zookeeper 在 Kafka 中的作用
1、Broker注冊(cè)
Broker是分布式部署并且相互之間相互獨(dú)立,但是需要有一個(gè)注冊(cè)系統(tǒng)能夠?qū)⒄麄€(gè)集群中的Broker管理起來(lái),此時(shí)就使用到了Zookeeper。在Zookeeper上會(huì)有一個(gè)專門(mén)用來(lái)進(jìn)行Broker服務(wù)器列表記錄的節(jié)點(diǎn):
/brokers/ids
每個(gè)Broker在啟動(dòng)時(shí),都會(huì)到Zookeeper上進(jìn)行注冊(cè),即到/brokers/ids下創(chuàng)建屬于自己的節(jié)點(diǎn),如/brokers/ids/[0...N]。
Kafka使用了全局唯一的數(shù)字來(lái)指代每個(gè)Broker服務(wù)器,不同的Broker必須使用不同的Broker ID進(jìn)行注冊(cè),創(chuàng)建完節(jié)點(diǎn)后,每個(gè)Broker就會(huì)將自己的IP地址和端口信息記錄到該節(jié)點(diǎn)中去。其中,Broker創(chuàng)建的節(jié)點(diǎn)類型是臨時(shí)節(jié)點(diǎn),一旦Broker宕機(jī),則對(duì)應(yīng)的臨時(shí)節(jié)點(diǎn)也會(huì)被自動(dòng)刪除。
2、Topic注冊(cè)
在Kafka中,同一個(gè)Topic的消息會(huì)被分成多個(gè)分區(qū)并將其分布在多個(gè)Broker上,這些分區(qū)信息及與Broker的對(duì)應(yīng)關(guān)系也都是由Zookeeper在維護(hù),由專門(mén)的節(jié)點(diǎn)來(lái)記錄,如:
/borkers/topics
Kafka中每個(gè)Topic都會(huì)以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker服務(wù)器啟動(dòng)后,會(huì)到對(duì)應(yīng)Topic節(jié)點(diǎn)(/brokers/topics)上注冊(cè)自己的Broker ID并寫(xiě)入針對(duì)該Topic的分區(qū)總數(shù),如/brokers/topics/login/3->2,這個(gè)節(jié)點(diǎn)表示Broker ID為3的一個(gè)Broker服務(wù)器,對(duì)于"login"這個(gè)Topic的消息,提供了2個(gè)分區(qū)進(jìn)行消息存儲(chǔ),同樣,這個(gè)分區(qū)節(jié)點(diǎn)也是臨時(shí)節(jié)點(diǎn)。
3、生產(chǎn)者負(fù)載均衡
由于同一個(gè)Topic消息會(huì)被分區(qū)并將其分布在多個(gè)Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。
(1) 四層負(fù)載均衡,根據(jù)生產(chǎn)者的IP地址和端口來(lái)為其確定一個(gè)相關(guān)聯(lián)的Broker。通常,一個(gè)生產(chǎn)者只會(huì)對(duì)應(yīng)單個(gè)Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡(jiǎn)單,每個(gè)生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP連接,只需要和Broker維護(hù)單個(gè)TCP連接即可。但是,其無(wú)法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個(gè)生產(chǎn)者產(chǎn)生的消息量及每個(gè)Broker的消息存儲(chǔ)量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會(huì)導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時(shí),生產(chǎn)者也無(wú)法實(shí)時(shí)感知到Broker的新增和刪除。
(2) 使用Zookeeper進(jìn)行負(fù)載均衡,由于每個(gè)Broker啟動(dòng)時(shí),都會(huì)完成Broker注冊(cè)過(guò)程,生產(chǎn)者會(huì)通過(guò)該節(jié)點(diǎn)的變化來(lái)動(dòng)態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動(dòng)態(tài)的負(fù)載均衡機(jī)制。
4、消費(fèi)者負(fù)載均衡
與生產(chǎn)者類似,Kafka中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來(lái)實(shí)現(xiàn)多個(gè)消費(fèi)者合理地從對(duì)應(yīng)的Broker服務(wù)器上接收消息,每個(gè)消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會(huì)發(fā)送給分組中的一個(gè)消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。
5、分區(qū) 與 消費(fèi)者 的關(guān)系
消費(fèi)組 (Consumer Group): consumer group 下有多個(gè) Consumer(消費(fèi)者)。 對(duì)于每個(gè)消費(fèi)者組 (Consumer Group),Kafka都會(huì)為其分配一個(gè)全局唯一的Group ID,Group 內(nèi)部的所有消費(fèi)者共享該 ID。訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè) group 下的一個(gè)consumer(當(dāng)然該分區(qū)還可以被分配給其他group)。 同時(shí),Kafka為每個(gè)消費(fèi)者分配一個(gè)Consumer ID,通常采用"Hostname:UUID"形式表示。
在Kafka中,規(guī)定了每個(gè)消息分區(qū) 只能被同組的一個(gè)消費(fèi)者進(jìn)行消費(fèi),因此,需要在 Zookeeper 上記錄 消息分區(qū) 與 Consumer 之間的關(guān)系,每個(gè)消費(fèi)者一旦確定了對(duì)一個(gè)消息分區(qū)的消費(fèi)權(quán)力,需要將其Consumer ID 寫(xiě)入到 Zookeeper 對(duì)應(yīng)消息分區(qū)的臨時(shí)節(jié)點(diǎn)上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一個(gè) 消息分區(qū) 的標(biāo)識(shí),節(jié)點(diǎn)內(nèi)容就是該 消息分區(qū) 上 消費(fèi)者的Consumer ID。
6、消息 消費(fèi)進(jìn)度Offset 記錄
在消費(fèi)者對(duì)指定消息分區(qū)進(jìn)行消息消費(fèi)的過(guò)程中,需要定時(shí)地將分區(qū)消息的消費(fèi)進(jìn)度Offset記錄到Zookeeper上,以便在該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度開(kāi)始繼續(xù)進(jìn)行消息消費(fèi)。Offset在Zookeeper中由一個(gè)專門(mén)節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節(jié)點(diǎn)內(nèi)容就是Offset的值。
7、消費(fèi)者注冊(cè)
消費(fèi)者服務(wù)器在初始化啟動(dòng)時(shí)加入消費(fèi)者分組的步驟如下
注冊(cè)到消費(fèi)者分組。每個(gè)消費(fèi)者服務(wù)器啟動(dòng)時(shí),都會(huì)到Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個(gè)屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會(huì)將自己訂閱的Topic信息寫(xiě)入該臨時(shí)節(jié)點(diǎn)。
對(duì) 消費(fèi)者分組 中的 消費(fèi)者 的變化注冊(cè)監(jiān)聽(tīng)。每個(gè) 消費(fèi)者 都需要關(guān)注所屬 消費(fèi)者分組 中其他消費(fèi)者服務(wù)器的變化情況,即對(duì)/consumers/[group_id]/ids節(jié)點(diǎn)注冊(cè)子節(jié)點(diǎn)變化的Watcher監(jiān)聽(tīng),一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。
對(duì)Broker服務(wù)器變化注冊(cè)監(jiān)聽(tīng)。消費(fèi)者需要對(duì)/broker/ids/[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng),如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來(lái)決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。
進(jìn)行消費(fèi)者負(fù)載均衡。為了讓同一個(gè)Topic下不同分區(qū)的消息盡量均衡地被多個(gè) 消費(fèi)者 消費(fèi)而進(jìn)行 消費(fèi)者 與 消息 分區(qū)分配的過(guò)程,通常,對(duì)于一個(gè)消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會(huì)發(fā)出消費(fèi)者負(fù)載均衡。
以下是kafka在zookeep中的詳細(xì)存儲(chǔ)結(jié)構(gòu)圖:

補(bǔ)充
早期版本的 kafka 用 zk 做 meta 信息存儲(chǔ),consumer 的消費(fèi)狀態(tài),group 的管理以及 offse t的值??紤]到zk本身的一些因素以及整個(gè)架構(gòu)較大概率存在單點(diǎn)問(wèn)題,新版本中確實(shí)逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對(duì)zookeeper的依賴