Kafka 是一個基于發布-訂閱模式的消息系統,它可以在多個生產者和消費者之間傳遞大量的數據。Kafka 的一個顯著特點是它的高吞吐率,即每秒可以處理百萬級別的消息。那么 Kafka 是如何實現這樣高得性能呢?本文將從七個方面來分析 Kafka 的速度優勢。
-
零拷貝技術 -
僅可追加日志結構 -
消息批處理 -
消息批量壓縮 -
消費者優化 -
未刷新的緩沖寫入 -
GC 優化
以下是對本文中使用得一些英文單詞解釋:
Broker:Kafka 集群中的一臺或多臺服務器統稱 broker
Producer:消息生產者
Consumer:消息消費者
zero copy:零拷貝
1. 零拷貝技術
零拷貝技術是指在讀寫數據時,避免將數據在內核空間和用戶空間之間進行拷貝,而是直接在內核空間進行數據傳輸。對于 Kafka 來說,它使用了零拷貝技術來加速磁盤文件的網絡傳輸,以提高讀取速度和降低 CPU 消耗。下圖說明了數據如何在生產者和消費者之間傳輸,以及零拷貝原理。
步驟 1.1~1.3:生產者將數據寫入磁盤
步驟 2:消費者不使用零拷貝方式讀取數據
2.1:數據從磁盤加載到 OS 緩存
2.2:將數據從 OS 緩存復制到 Kafka 應用程序
2.3:Kafka 應用程序將數據復制到 socket 緩沖區
2.4:將數據從 socket 緩沖區復制到網卡
2.5:網卡將數據發送給消費者
步驟 3:消費者以零拷貝方式讀取數據
3.1:數據從磁盤加載到 OS 緩存
3.2:OS 緩存通過 sendfile() 命令直接將數據復制到網卡
3.3:網卡將數據發送到消費者
可以看到,零拷貝技術避免了多余得兩步操作,數據直接從OS 緩存復制到網卡再到消費者。這樣做的好處是極大地提高了I/O效率,降低了CPU和內存的消耗。
2. 僅可追加日志結構
Kafka 中存在大量的網絡數據持久化到磁盤(生產者到代理)和磁盤文件通過網絡發送(代理到消費者)的過程。這一過程的性能會直接影響 Kafka 的整體吞吐量。為了優化 Kafka 的數據存儲和傳輸,Kafka 采用了一種僅可追加日志結構方式來持久化數據。僅可追加日志結構是指將數據以順序追加(Append-only)的方式寫入到文件中,而不是進行隨機寫入或更新。這樣做的好處是可以減少磁盤 I/O 的開銷,提高寫入速度。
人們普遍認為磁盤的讀寫速度很慢,但實際上存儲介質(尤其是旋轉介質)的性能很大程度上取決于訪問模式。常見的 7,200 RPM SATA 磁盤上的隨機 I / O 的性能要比順序 I / O 慢 3 ~ 4 個數量級。此外,現代操作系統提供了預讀和延遲寫入技術,可以預先取出大塊的數據,并將較小的邏輯寫入組合成較大的物理寫入。因此,即使在閃存和其他形式的固態非易失性介質中,隨機 I/O 和順序 I/O 的差異仍然很明顯,盡管與旋轉介質相比,這種差異性已經很小了。
3. 消息批處理
Kafka 的高吞吐率設計的核心要點之一是批處理,即 Kafka 在消息發送端和接收端都引入了一個緩沖區,將多條消息打包成一個批次(Batch),然后一次性發送或接收。這樣做的好處是可以減少網絡請求的次數,減少了網絡壓力,提高了傳輸效率。
Kafka 的消息批處理優化主要涉及以下幾個方面:
發送端(Producer)
Kafka 的 Producer 只提供了單條發送的 send()方法,并沒有提供任何批量發送的接口。當調用 send()方法發送一條消息之后,無論是同步還是異步發送,這條消息不會立即發送出去,而是先放入到一個雙端隊列中,然后 Kafka 使用一個異步線程從隊列中成批發送消息。
Kafka 提供了以下幾個參數來控制發送端的批處理策略:
-
batch.size:指定每個批次可以收集的消息數量的最大值。默認是 16KB。 -
buffer.memory:指定每個 Producer 可以使用的緩沖區內存的總量。默認是 32MB。 -
linger.ms:指定每個批次可以等待的時間的最大值。默認是 0ms。 -
compression.type:指定是否對每個批次進行壓縮,以及使用哪種壓縮算法。默認是 none。
接收端(Broker)
Kafka 的 Broker 在接收到 Producer 發送過來的批次后,不會把批次再還原成多條消息,而是直接將整個批次寫入到磁盤中。這樣做的好處是可以減少磁盤 I/O 的開銷,提高寫入速度。
Kafka 利用了操作系統提供的內存映射文件(memory mapped file)功能,將文件映射到內存中,使得對文件的讀寫操作就相當于對內存的讀寫操作。這樣就避免了用戶空間和內核空間之間的數據拷貝,也避免了系統調用的開銷。
消費端(Consumer)
Kafka 的 Consumer 在從 Broker 拉取數據時,也是以批次為單位進行傳遞的。Consumer 從 Broker 拉到一批消息后,客戶端把批次解開,再一條一條交給用戶代碼處理。
Kafka 提供了以下幾個參數來控制消費端的批處理策略:
-
fetch.min.bytes:指定每次拉取請求至少要獲取多少字節的數據。默認是 1B。 -
fetch.max.bytes:指定每次拉取請求最多能獲取多少字節的數據。默認是 50MB。 -
fetch.max.wAIt.ms:指定每次拉取請求最多能等待多長時間。默認是 500ms。 -
max.partition.fetch.bytes:指定每個分區每次拉取請求最多能獲取多少字節的數據。默認是 1MB。
4. 消息批量壓縮
消息批量壓縮通常與消息批處理一起使用。Kafka 會將多個消息打包成一個批次(Batch),并對批次進行壓縮(例如使用 gzip 或 snappy 算法),然后再發送給消費者。這樣做的好處是可以節省網絡帶寬,提高傳輸效率。
當然,壓縮也有一定的代價,即需要消耗 CPU 資源來進行壓縮和解壓縮。但是對于 Kafka 這樣的高吞吐量的系統來說,網絡帶寬往往是更大的瓶頸,所以壓縮是值得的。
Kafka 還提供了一種靈活的壓縮策略,即可以讓生產者、代理和消費者之間協商壓縮格式和級別。生產者可以選擇是否對消息進行壓縮,以及使用哪種壓縮算法;代理可以選擇是否保留生產者壓縮的消息,或者對其進行重新壓縮;消費者可以選擇是否對收到的消息進行解壓縮。這樣可以根據不同的場景和需求來平衡性能和資源的消耗。
5. 消費者優化
Kafka 的消費者是基于拉模式(pull)的,即消費者主動向服務器請求數據,而不是服務器主動推送數據給消費者。這樣做的好處是可以讓消費者自己控制消費的速度和時機,也可以減輕服務器的負擔,提高整體的吞吐量。
Kafka 的消費者所實現的功能是比較簡潔的,即它們不需要維護太多的狀態和資源,也不需要和服務器進行復雜的交互。Kafka 的消費者只需要做以下幾件事:
-
訂閱一個或多個主題(topic),并加入一個消費者組(consumer group)。向群組協調器(group coordinator)發送心跳,表明自己還活著,并參與分區再均衡(partition rebalance)。 -
向分區所在的代理(broker)發送拉取請求(fetch request),獲取消息數據。 -
提交自己消費到的偏移量(offset),以便在出現故障時恢復消費位置。
可以看到,Kafka 的消費者并不需要保存消息數據,也不需要對消息進行確認或回復,也不需要處理重試或重復的問題。這些都由服務器端來負責。Kafka 的消費者只需要關注如何從服務器獲取數據,并進行業務處理即可。
6. 未刷新的緩沖寫入
Kafka 在寫入數據時,使用了一種未刷新(flush)的緩沖寫入技術,即它不會立即將數據寫入硬盤,而是先寫入內存緩存中,然后由操作系統在適當的時候刷新到硬盤上。這樣做的好處是可以提高寫入速度,減少磁盤 I/O 的開銷。
Kafka 利用了操作系統提供的內存映射文件(memory mapped file)功能,將文件映射到內存中,使得對文件的讀寫操作就相當于對內存的讀寫操作。這樣就避免了用戶空間和內核空間之間的數據拷貝,也避免了系統調用的開銷。
當生產者向 Kafka 發送消息時,Kafka 會將消息追加到內存映射文件中,并返回一個確認給生產者。此時消息并沒有真正寫入硬盤,而是由操作系統負責將內存中的數據刷新到硬盤上。操作系統會根據一些策略來決定何時刷新數據,例如定期刷新、緩存滿了刷新、系統空閑時刷新等。
當然,這種技術也有一定的風險,即如果操作系統在刷新數據之前發生崩潰或斷電,那么內存中未刷新的數據就會丟失。為了解決這個問題,Kafka 提供了一些參數來控制刷新策略,例如:
-
log.flush.interval.messages:指定多少條消息后強制刷新數據。 -
log.flush.interval.ms:指定多少毫秒后強制刷新數據。 -
producer.type:指定生產者是同步還是異步模式。同步模式下,生產者會等待服務器刷新數據后再返回確認;異步模式下,生產者不會等待服務器刷新數據,而是立即返回確認。
7. GC 優化
Kafka 作為一個 JAVA 編寫得高性能的分布式消息系統,它需要處理大量的數據讀寫和網絡傳輸。這些操作都會涉及到 Java 虛擬機(JVM)的內存管理和垃圾回收(GC)機制。如果 GC 不合理或不及時,就會導致 Kafka 的性能下降,甚至出現內存溢出或頻繁的停頓。為了幫助使用者優化 GC,Kakfa 有如下建議。
堆內存大小
堆內存是 JVM 用來存儲對象實例的內存區域,它會受到 GC 的管理和回收。堆內存的大小會影響 Kafka 的性能和穩定性,如果堆內存太小,就會導致頻繁的 GC,影響吞吐量和延遲;如果堆內存太大,就會導致 GC 時間過長,影響響應速度和可用性。
通常來說,Kafka 并不需要設置太大的堆內存,因為它主要依賴于操作系統的文件緩存(page cache)來緩存和讀寫數據,而不是將數據保存在堆內存中。因此 Kafka 建議將堆內存大小設置為 4GB 到 6GB 之間。
堆外內存大小
堆外內存是 JVM 用來存儲非對象實例的內存區域,它不會受到 GC 的管理和回收。堆外內存主要用于網絡 I/O 緩沖區、直接內存映射文件、壓縮庫等。
Kafka 在進行網絡 I/O 時,會使用堆外內存作為緩沖區,以減少數據在用戶空間和內核空間之間的拷貝。同時,Kafka 在進行數據壓縮時,也會使用堆外內存作為臨時空間,以減少 CPU 資源的消耗。
因此,堆外內存對于 Kafka 的性能也很重要,如果堆外內存不足,就會導致緩沖區分配失敗或壓縮失敗,影響吞吐量和延遲。通常來說,Kafka 建議將堆外內存大小設置為 8GB 左右。
GC 算法和參數
GC 算法是 JVM 用來回收無用對象占用的堆內存空間的方法,它會影響 Kafka 的停頓時間和吞吐量。GC 算法有多種選擇,例如串行 GC、并行 GC、CMS GC、G1 GC 等。
不同的 GC 算法有不同的優缺點和適用場景,例如串行 GC 適合小型應用和低延遲場景;并行 GC 適合大型應用和高吞吐量場景;CMS GC 適合大型應用和低停頓時間場景;G1 GC 適合大型應用和平衡停頓時間和吞吐量場景等。
通常來說,Kafka 建議使用 G1 GC 作為默認的 GC 算法,因為它可以在保證較高吞吐量的同時,控制停頓時間在 200ms 以內。同時,Kafka 還建議根據具體情況調整一些 GC 參數,例如:
-
-XX:MaxGCPauseMillis:指定最大停頓時間目標,默認是 200ms。 -
-XX:InitiatingHeapOccupancyPercent:指定觸發并發標記周期的堆占用百分比,默認是 45%。 -
-XX:G1ReservePercent:指定為拷貝存活對象預留的空間百分比,默認是 10%。 -
-XX:G1HeapRegionSize:指定每個堆區域的大小,默認是 2MB。
本文參考
-
https://medium.com/swlh/why-kafka-is-so-fast-bde0d987cd03 -
https://blog.bytebytego.com/p/why-is-kafka-fast -
https://blog.csdn.NET/csdnnews/article/details/104471147
總結
最后感謝大家閱讀,希望本文能對你有所幫助。