【51CTO.com原創(chuàng)稿件】本文主要講解 Kafka 是什么、Kafka 的架構(gòu)包括工作流程和存儲機(jī)制,以及生產(chǎn)者和消費(fèi)者。

圖片來自 Pexels
最終大家會掌握 Kafka 中最重要的概念,分別是 Broker、Producer、Consumer、Consumer Group、Topic、Partition、Replica、Leader、Follower,這是學(xué)會和理解 Kafka 的基礎(chǔ)和必備內(nèi)容。
定義
Kafka 是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用與大數(shù)據(jù)實時處理領(lǐng)域。
消息隊列
Kafka 本質(zhì)上是一個 MQ(Message Queue),使用消息隊列的好處?(面試會問)
- 解耦:允許我們獨立的擴(kuò)展或修改隊列兩邊的處理過程。
- 可恢復(fù)性:即使一個處理消息的進(jìn)程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
- 緩沖:有助于解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
- 靈活性&峰值處理能力:不會因為突發(fā)的超負(fù)荷的請求而完全崩潰,消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力。
- 異步通信:消息隊列允許用戶把消息放入隊列但不立即處理它。
發(fā)布/訂閱模式

一對多,生產(chǎn)者將消息發(fā)布到 Topic 中,有多個消費(fèi)者訂閱該主題,發(fā)布到 Topic 的消息會被所有訂閱者消費(fèi),被消費(fèi)的數(shù)據(jù)不會立即從 Topic 清除。
架構(gòu)

Kafka 存儲的消息來自任意多被稱為 Producer 生產(chǎn)者的進(jìn)程。數(shù)據(jù)從而可以被發(fā)布到不同的 Topic 主題下的不同 Partition 分區(qū)。
在一個分區(qū)內(nèi),這些消息被索引并連同時間戳存儲在一起。其它被稱為 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱消息。
Kafka 運(yùn)行在一個由一臺或多臺服務(wù)器組成的集群上,并且分區(qū)可以跨集群結(jié)點分布。
下面給出 Kafka 一些重要概念,讓大家對 Kafka 有個整體的認(rèn)識和感知,后面還會詳細(xì)的解析每一個概念的作用以及更深入的原理:
- Producer: 消息生產(chǎn)者,向 Kafka Broker 發(fā)消息的客戶端。
- Consumer:消息消費(fèi)者,從 Kafka Broker 取消息的客戶端。
- Consumer Group:消費(fèi)者組(CG),消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),提高消費(fèi)能力。一個分區(qū)只能由組內(nèi)一個消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個消費(fèi)者組,即消費(fèi)者組是邏輯上的一個訂閱者。
- Broker:一臺 Kafka 機(jī)器就是一個 Broker。一個集群由多個 Broker 組成。一個 Broker 可以容納多個 Topic。
- Topic:可以理解為一個隊列,Topic 將消息分類,生產(chǎn)者和消費(fèi)者面向的是同一個 Topic。
- Partition:為了實現(xiàn)擴(kuò)展性,提高并發(fā)能力,一個非常大的 Topic 可以分布到多個 Broker (即服務(wù)器)上,一個 Topic 可以分為多個 Partition,每個 Partition 是一個 有序的隊列。
- Replica:副本,為實現(xiàn)備份的功能,保證集群中的某個節(jié)點發(fā)生故障時,該節(jié)點上的 Partition 數(shù)據(jù)不丟失,且 Kafka 仍然能夠繼續(xù)工作,Kafka 提供了副本機(jī)制,一個 Topic 的每個分區(qū)都有若干個副本,一個 Leader 和若干個 Follower。
- Leader:每個分區(qū)多個副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象,都是 Leader。
- Follower:每個分區(qū)多個副本的“從”副本,實時從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時,某個 Follower 還會成為新的 Leader。
- Offset:消費(fèi)者消費(fèi)的位置信息,監(jiān)控數(shù)據(jù)消費(fèi)到什么位置,當(dāng)消費(fèi)者掛掉再重新恢復(fù)的時候,可以從消費(fèi)位置繼續(xù)消費(fèi)。
- Zookeeper:Kafka 集群能夠正常工作,需要依賴于 Zookeeper,Zookeeper 幫助 Kafka 存儲和管理集群信息。
工作流程
Kafka集群將 Record 流存儲在稱為 Topic 的類別中,每個記錄由一個鍵、一個值和一個時間戳組成。

Kafka 是一個分布式流平臺,這到底是什么意思?
- 發(fā)布和訂閱記錄流,類似于消息隊列或企業(yè)消息傳遞系統(tǒng)。
- 以容錯的持久方式存儲記錄流。
- 處理記錄流。
Kafka 中消息是以 Topic 進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,面向的都是同一個 Topic。
Topic 是邏輯上的概念,而 Partition 是物理上的概念,每個 Partition 對應(yīng)于一個 log 文件,該 log 文件中存儲的就是 Producer 生產(chǎn)的數(shù)據(jù)。
Producer 生產(chǎn)的數(shù)據(jù)會不斷追加到該 log 文件末端,且每條數(shù)據(jù)都有自己的 Offset。
消費(fèi)者組中的每個消費(fèi)者,都會實時記錄自己消費(fèi)到了哪個 Offset,以便出錯恢復(fù)時,從上次的位置繼續(xù)消費(fèi)。
存儲機(jī)制

由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采取了分片和索引機(jī)制。
它將每個 Partition 分為多個 Segment,每個 Segment 對應(yīng)兩個文件:“.index” 索引文件和 “.log” 數(shù)據(jù)文件。
這些文件位于同一文件下,該文件夾的命名規(guī)則為:topic 名-分區(qū)號。例如,first 這個 topic 有三分分區(qū),則其對應(yīng)的文件夾為 first-0,first-1,first-2。
# ls /root/data/kafka/first-0 00000000000000009014.index 00000000000000009014.log 00000000000000009014.timeindex 00000000000000009014.snapshot leader-epoch-checkpoint
index 和 log 文件以當(dāng)前 Segment 的第一條消息的 Offset 命名。下圖為 index 文件和 log 文件的結(jié)構(gòu)示意圖:

“.index” 文件存儲大量的索引信息,“.log” 文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 Message 的物理偏移量。
生產(chǎn)者
分區(qū)策略
分區(qū)原因:
- 方便在集群中擴(kuò)展,每個 Partition 可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個 Topic 又可以有多個 Partition 組成,因此可以以 Partition 為單位讀寫了。
- 可以提高并發(fā),因此可以以 Partition 為單位讀寫了。
分區(qū)原則:我們需要將 Producer 發(fā)送的數(shù)據(jù)封裝成一個 ProducerRecord 對象。
該對象需要指定一些參數(shù):
- topic:string 類型,NotNull。
- partition:int 類型,可選。
- timestamp:long 類型,可選。
- key:string 類型,可選。
- value:string 類型,可選。
- headers:array 類型,Nullable。
①指明 Partition 的情況下,直接將給定的 Value 作為 Partition 的值。
②沒有指明 Partition 但有 Key 的情況下,將 Key 的 Hash 值與分區(qū)數(shù)取余得到 Partition 值。
③既沒有 Partition 有沒有 Key 的情況下,第一次調(diào)用時隨機(jī)生成一個整數(shù)(后面每次調(diào)用都在這個整數(shù)上自增),將這個值與可用的分區(qū)數(shù)取余,得到 Partition 值,也就是常說的 Round-Robin 輪詢算法。
數(shù)據(jù)可靠性保證
為保證 Producer 發(fā)送的數(shù)據(jù),能可靠地發(fā)送到指定的 Topic,Topic 的每個 Partition 收到 Producer 發(fā)送的數(shù)據(jù)后,都需要向 Producer 發(fā)送 ACK(ACKnowledge 確認(rèn)收到)。
如果 Producer 收到 ACK,就會進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。

①副本數(shù)據(jù)同步策略
何時發(fā)送 ACK?確保有 Follower 與 Leader 同步完成,Leader 再發(fā)送 ACK,這樣才能保證 Leader 掛掉之后,能在 Follower 中選舉出新的 Leader 而不丟數(shù)據(jù)。
多少個 Follower 同步完成后發(fā)送 ACK?全部 Follower 同步完成,再發(fā)送 ACK。

②ISR
采用第二種方案,所有 Follower 完成同步,Producer 才能繼續(xù)發(fā)送數(shù)據(jù),設(shè)想有一個 Follower 因為某種原因出現(xiàn)故障,那 Leader 就要一直等到它完成同步。
這個問題怎么解決?Leader維護(hù)了一個動態(tài)的 in-sync replica set(ISR):和 Leader 保持同步的 Follower 集合。
當(dāng) ISR 集合中的 Follower 完成數(shù)據(jù)的同步之后,Leader 就會給 Follower 發(fā)送 ACK。
如果 Follower 長時間未向 Leader 同步數(shù)據(jù),則該 Follower 將被踢出 ISR 集合,該時間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定。Leader 發(fā)生故障后,就會從 ISR 中選舉出新的 Leader。
③ACK 應(yīng)答機(jī)制
對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的 Follower 全部接受成功。
所以 Kafka 為用戶提供了三種可靠性級別,用戶根據(jù)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。

Ack 參數(shù)配置:
- 0:Producer 不等待 Broker 的 ACK,這提供了最低延遲,Broker 一收到數(shù)據(jù)還沒有寫入磁盤就已經(jīng)返回,當(dāng) Broker 故障時有可能丟失數(shù)據(jù)。
- 1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盤成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么將會丟失數(shù)據(jù)。
- -1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盤成功后才返回 ACK。但是在 Broker 發(fā)送 ACK 時,Leader 發(fā)生故障,則會造成數(shù)據(jù)重復(fù)。
④故障處理細(xì)節(jié)

LEO:每個副本最大的 Offset。HW:消費(fèi)者能見到的最大的 Offset,ISR 隊列中最小的 LEO。
Follower 故障:Follower 發(fā)生故障后會被臨時踢出 ISR 集合,待該 Follower 恢復(fù)后,F(xiàn)ollower 會 讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 Leader 進(jìn)行同步數(shù)據(jù)操作。
等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了。
Leader 故障:Leader 發(fā)生故障后,會從 ISR 中選出一個新的 Leader,之后,為保證多個副本之間的數(shù)據(jù)一致性,其余的 Follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 Leader 同步數(shù)據(jù)。
注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
Exactly Once 語義
將服務(wù)器的 ACK 級別設(shè)置為 -1,可以保證 Producer 到 Server 之間不會丟失數(shù)據(jù),即 At Least Once 語義。
相對的,將服務(wù)器 ACK 級別設(shè)置為 0,可以保證生產(chǎn)者每條消息只會被發(fā)送一次,即 At Most Once 語義。
At Least Once 可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù);相對的,At Most Once 可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失。
但是,對于一些非常重要的信息,比如交易數(shù)據(jù),下游數(shù)據(jù)消費(fèi)者要求數(shù)據(jù)既不重復(fù)也不丟失,即 Exactly Once 語義。
0.11 版本的 Kafka,引入了冪等性:Producer 不論向 Server 發(fā)送多少重復(fù)數(shù)據(jù),Server 端都只會持久化一條。
即:
At Least Once + 冪等性 = Exactly Once
要啟用冪等性,只需要將 Producer 的參數(shù)中 enable.idompotence 設(shè)置為 true 即可。
開啟冪等性的 Producer 在初始化時會被分配一個 PID,發(fā)往同一 Partition 的消息會附帶 Sequence Number。
而 Borker 端會對
但是 PID 重啟后就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區(qū)會話的 Exactly Once。
消費(fèi)者
消費(fèi)方式
Consumer 采用 Pull(拉取)模式從 Broker 中讀取數(shù)據(jù)。
Consumer 采用 Push(推送)模式,Broker 給 Consumer 推送消息的速率是由 Broker 決定的,很難適應(yīng)消費(fèi)速率不同的消費(fèi)者。
它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。
而 Pull 模式則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。Pull 模式不足之處是,如果 Kafka 沒有數(shù)據(jù),消費(fèi)者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。
因為消費(fèi)者從 Broker 主動拉取數(shù)據(jù),需要維護(hù)一個長輪詢,針對這一點, Kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時會傳入一個時長參數(shù) timeout。
如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),Consumer 會等待一段時間之后再返回,這段時長即為 timeout。
分區(qū)分配策略
一個 Consumer Group 中有多個 Consumer,一個 Topic 有多個 Partition,所以必然會涉及到 Partition 的分配問題,即確定哪個 Partition 由哪個 Consumer 來消費(fèi)。
Kafka 有兩種分配策略,一個是 RoundRobin,一個是 Range,默認(rèn)為Range,當(dāng)消費(fèi)者組內(nèi)消費(fèi)者發(fā)生變化時,會觸發(fā)分區(qū)分配策略(方法重新分配)。
①RoundRobin

RoundRobin 輪詢方式將分區(qū)所有作為一個整體進(jìn)行 Hash 排序,消費(fèi)者組內(nèi)分配分區(qū)個數(shù)最大差別為 1,是按照組來分的,可以解決多個消費(fèi)者消費(fèi)數(shù)據(jù)不均衡的問題。
但是,當(dāng)消費(fèi)者組內(nèi)訂閱不同主題時,可能造成消費(fèi)混亂,如下圖所示,Consumer0 訂閱主題 A,Consumer1 訂閱主題 B。

將 A、B 主題的分區(qū)排序后分配給消費(fèi)者組,TopicB 分區(qū)中的數(shù)據(jù)可能分配到 Consumer0 中。
②Range

Range 方式是按照主題來分的,不會產(chǎn)生輪詢方式的消費(fèi)混亂問題。
但是,如下圖所示,Consumer0、Consumer1 同時訂閱了主題 A 和 B,可能造成消息分配不對等問題,當(dāng)消費(fèi)者組內(nèi)訂閱的主題越多,分區(qū)分配可能越不均衡。

Offset 的維護(hù)
由于 Consumer 在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,Consumer 恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi)。
所以 Consumer 需要實時記錄自己消費(fèi)到了哪個 Offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
Kafka 0.9 版本之前,Consumer 默認(rèn)將 Offset 保存在 Zookeeper 中,從 0.9 版本開始,Consumer 默認(rèn)將 Offset 保存在 Kafka 一個內(nèi)置的 Topic 中,該 Topic 為 __consumer_offsets。
總結(jié)
上面和大家一起深入探討了 Kafka 的架構(gòu),比較偏重理論和基礎(chǔ),這是掌握 Kafka 的必要內(nèi)容,接下來我會以代碼和實例的方式,更新 Kafka 有關(guān) API 以及事務(wù)、攔截器、監(jiān)控等高級篇,讓大家徹底理解并且會用 Kafka。
作者:臧遠(yuǎn)慧
簡介:就職于中科星圖股份有限公司(北京),研發(fā)部后端技術(shù)組。個人擅長 Python/JAVA 開發(fā),了解前端基礎(chǔ);熟練掌握 MySQL,MongoDB,了解 redis;熟悉 linux 開發(fā)環(huán)境,掌握 Shell 編程,有良好的 Git 源碼管理習(xí)慣;精通 Nginx ,F(xiàn)lask、Swagger 開發(fā)框架;有 Docker+Kubernetes 云服務(wù)開發(fā)經(jīng)驗。對人工智能、云原生技術(shù)有較大的興趣。
【51CTO原創(chuàng)稿件,合作站點轉(zhuǎn)載請注明原文作者和出處為51CTO.com】