Kafka是一個能夠支持高并發以及流式消息處理的消息中間件,并且Kafka天生就是支持集群的,今天就主要來介紹一下如何搭建Kafka集群。
Kafka目前支持使用Zookeeper模式搭建集群以及KRaft模式(即無Zookeeper)模式這兩種模式搭建集群,這兩種模式各有各的好處,今天就來分別介紹一下這兩種方式。
1,Kafka集群中的節點類型
我們首先需要了解一下,一個Kafka集群是由下列幾種類型的節點構成的,它們充當著不同的作用:
Broker節點:即代理節點,是Kafka中的工作節點,充當消息隊列的角色,負責儲存和處理消息,每個Broker都是一個獨立的Kafka服務器,可以在不同的機器上運行,除此之外Broker還負責分區(partition)的管理,將主題(topic)劃分為多個分區,并分布在集群的不同Broker上
Controller節點:即控制器節點,是集群中的特殊節點,負責儲存和管理整個集群元數據和狀態,它能夠監控整個集群中的Broker,在需要時還能夠進行平衡操作
混合節點:即同時擔任Broker和Controller節點角色的節點
2,兩種模式集群的搭建方式
接下來,我就來介紹一下兩種模式的集群架構和搭建方式,即Zookeeper模式集群和KRaft模式集群。
(1) Zookeeper模式集群
這是一種比較簡單,相對“傳統”的搭建方式了!在這種模式下,每個Kafka節點都是依賴于Zookeeper的,使用Zookeeper存儲集群中所有節點的元數據。
只要所有的Kafka節點連接到同一個Zookeeper上面(或者同一個Zookeeper集群),這些Kafka節點就構成了一個集群。所以說就算是只有一個Kafka節點在運行,這一個節點也可以稱作一個集群。
在Zookeeper模式集群中,Zookeeper節點(或者集群)就充當了Controller的角色,而所有的Kafka節點就充當著Broker的角色。
下面就來介紹一下搭建過程,這里我在4臺linux虛擬機上分別運行Zookeeper和Kafka來模擬一個集群,一共一個Zookeeper節點和三個Kafka節點構成,如下:
上述地址例如kafka1等等,是通過修改虛擬機的主機名(hostname)實現的,這樣虛擬機之間可以直接通過這些主機名相互訪問,這個主機名我們就可以視作實際在服務器上面搭建時,服務器的外網地址或者域名,這里就不再贅述如何修改虛擬機的主機名了,需要保證上述所有虛擬機在一個虛擬機網段中并且能夠互相ping通,即上述所有虛擬機需要兩兩之間可以通過網絡互相訪問。
運行Kafka和Zookeeper都需要JAVA 8及其以上運行環境,大家要首先在虛擬機中安裝并配置好。
① 搭建Zookeeper
首先我們要運行起一個Zookeeper節點,這里就不再贅述Zookeeper節點如何搭建了!搭建可以查看 官方文檔,或者使用 Docker的方式搭建。
搭建完成并運行Zookeeper之后,我們會把所有的Kafka節點都配置到這一個Zookeeper節點上。
② 配置并運行所有Kafka節點
首先去 Kafka官網下載最新版并解壓,然后將解壓出來的Kafka分別復制到三臺虛擬機中。
然后修改每臺虛擬機的Kafka目錄中的配置文件,配置文件位于解壓的Kafka文件夾中的config/server.properties,使用文本編輯器打開,并找到下列配置項進行配置:
- broker.id 表示每個節點的id,每個節點需要設置為不一樣的整數,我這里分別設置為1,2和3
- zookeeper.connect 表示要使用的Zookeeper的地址和端口,我這里都配置為zookeeper:2181
- advertised.listeners 表示這個Kafka節點的外網地址,這里分別配置為PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092和PLAINTEXT://kafka3:9092,需要注意的是這個配置項必須要配置為其所在服務器的外網地址,如果說你是在一臺服務器上面配置Kafka并要通過外網訪問,這個配置就需要配置為服務器外網地址(域名),并且都以PLAINTEXT://開頭
注意上述advertised.listeners這個配置項默認情況下是被注釋掉了的,大家需要去仔細找一下并去掉注釋(開頭的#)然后再進行配置。
三臺虛擬機配置完成后,分別使用終端進入到Kafka目錄下并啟動,執行下列命令:
bash
bin/kafka-server-start.sh config/server.properties
在上述三臺虛擬機上面都通過這個命令啟動Kafka,如圖則啟動成功:
到此,整個集群就搭建完成了!大家需要保證上述三臺虛擬機中的終端不被關閉。
③ 創建話題測試
我們先在kafka1的虛擬機上面再開一個終端并進入Kafka目錄,執行下列命令創建Topic:
bash
bin/kafka-server-start.sh config/server.properties
然后去kafka2的虛擬機上面再開一個終端并進入Kafka目錄,執行下列命令列出Topic:
bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
可見我們在第一個節點上創建了話題,但是在第二個節點上仍然可以獲取這個話題,說明集群創建成功,數據在集群之間可以共享。
(2) KRaft模式集群
在上述傳統方案中,Kafka需要依賴Zookeeper完成元數據存放和共享,這樣也就暴露出了一些問題:
搭建Kafka集群時還需要額外搭建Zookeeper,增加了運維成本
Zookeeper是強一致性的組件(符合CP理論),如果集群中數據發生變化,那么必須要等到其它節點都同步,至少超過一半同步完成,這樣節點數多性能差
那么KRaft模式是新版本Kafka中推出的集群模式,這種模式下就完全不需要Zookeeper了!只需要數個Kafka節點就可以直接構成集群,在這時集群中的Kafka節點既有可能是Controller節點也可能是Broker節點,在這個模式中,我們不僅可以手動配置某個節點的角色(是Controller還是Broker),還可以使其同時擔任Broker和Controller角色(混合節點)。
在KRaft模式中,集群的節點會通過投票選舉的方式,選擇出一個主要的Controller節點,這個節點也稱作領導者,它將負責維護整個集群的元數據和狀態信息,那么其它的Controller節點或者混合節點就稱之為追隨者,它們會從領導者同步集群元數據和狀態信息。如果領導者宕機了,所有的節點會重新投票選舉一個新的領導者。
在選舉過程中,所有的節點都會參與投票過程,而候選節點只會是Controller節點或者混合節點(即Broker節點不會被選舉為領導者)。
需要注意的是,在默認情況下Kafka集群中的Broker節點和Controller節點通常會監聽不同的端口:
- Broker節點是Kafka集群中的數據節點(消息隊列),它們負責接收客戶端的消息和傳遞消息給客戶端,默認情況下,每個Broker節點會監聽9092端口,該端口用于與客戶端進行通信,客戶端可以將消息發送到這個端口,或者從這個端口接收消息,這個端口可以稱作客戶端通信端口
- Controller節點是Kafka集群中的控制器節點,負責管理集群的狀態和元數據,Controller節點監聽的端口通常是9093,該端口用于集群中其他節點獲取元數據或在混合節點選舉新的Controller時進行通信,通過該端口,其他節點可以與Controller節點交互,獲取集群的元數據信息或參與控制器的選舉過程,這個端口可以稱作控制器端口
- 混合節點(即同時擔任Broker和Controller角色的節點)中,這兩個端口都會被使用,默認情況下混合節點將監聽9092端口接收和傳遞消息給客戶端,并監聽9093端口用于與其他節點進行元數據交換和控制器選舉通信,可見混合節點會同時使用兩個端口分別作為客戶端通信端口與控制器端口
所以需要根據實際情況配置網絡設置和防火墻規則,以確保Kafka集群中的節點能夠在正確的端口上進行通信。上述提到的兩種端口也是可以修改的,當然不建議修改。
同樣地,就算是你只是搭建了一個Kafka節點,這一個節點也仍然被視為一個Kafka集群,并且KRaft模式下如果只需要建立一個節點,那么這個節點必須是混合節點。
下面同樣是開啟三臺虛擬機,搭建三個Kafka節點構成的KRaft模式集群如下:
這里就不再贅述下載Kafka的過程了!
① 修改配置文件
在KRaft模式下,配置文件位于Kafka目錄中的config/kraft/server.properties,使用文本編輯器打開并找到下列配置以修改:
node.id 表示這個節點的id,一個集群中每個節點id不能重復,需要是不小于1的整數,這里三臺虛擬機的配置分別為1,2和3(類似上述Zookeeper的broker.id配置)
controller.quorum.voters 設定投票者列表,即需要配置所有的Controller節點id及其地址端口,配置格式為節點1的id@節點1地址:節點1端口,節點2的id@節點2地址:節點2端口,節點3的id@節點3地址:節點3端口...,這里的端口需要是控制器端口,默認都是9093,上面也提到過了,默認不需要修改,我這里三臺虛擬機的都配置為1@kafka1:9093,2@kafka2:9093,3@kafka3:9093(實際在服務器上搭建時替換為服務器的外網地址或者域名)
advertised.listeners 表示這個Kafka節點的外網地址,這里分別配置為PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092和PLAINTEXT://kafka3:9092(和上述Zookeeper模式中的一樣,實際在服務器上搭建時替換為服務器的外網地址或者域名)
上述是必須要進行配置的,還有下面配置是可以選擇性配置的:
process.roles 表示設定這個節點的類型,設定為broker表示設定這個節點為Broker節點,同樣地設定controller表示設定為Controller節點,默認是broker,controller表示這個節點會自動切換節點類型,這里先保持默認不變,下面再來詳細討論
② 生成集群ID并使用集群ID格式化數據目錄
在KRaft模式下,一個集群需要設定一個id,我們可以使用自帶的命令生成,先進入上述任意一臺虛擬機并使用終端進入Kafka目錄中,執行下列命令生成一個UUID:
arduino
bin/kafka-storage.sh random-uuid
我們這里記錄下這個ID以備用。
這個集群ID事實上是一個長度16位的字符串通過Base64編碼后得來的,因此你也可以不使用上述命令,直接自定義一個16位長度的純英文和數字組成的字符串,然后將這個字符串編碼為Base64格式作為這個集群ID也可以。可以使用 菜鳥工具中的在線Base64編碼工具。
然后在上述三臺虛擬機中,都使用終端進入Kafka目錄后,執行下列命令:
bash
bin/kafka-storage.sh format -t 生成的集群ID -c config/kraft/server.properties
這樣,三個Kafka節點都使用了這一個ID完成了集群元數據配置,表示這三個Kafka節點構成一個集群。
③ 啟動Kafka
同樣地,在三臺虛擬機中,都使用終端進入Kafka目錄后,執行下列命令:
bash
bin/kafka-server-start.sh config/kraft/server.properties
三臺虛擬機全部啟動后,這個集群才啟動完畢。
④ 創建話題測試
同樣地,現在第一個虛擬機的Kafka目錄下執行下列命令:
bash
bin/kafka-topics.sh --create --topic my-topic-kraft --bootstrap-server localhost:9092
然后在第二個虛擬機的Kafka目錄下查看話題:
bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
可見集群節點之間可以互相通信。
無論是在虛擬機還是服務器上,都要保證9092和9093端口開放,且所有虛擬機/服務器之間都能夠兩兩互相訪問(網絡連通)!
3,重要配置介紹
無論是那種模式的集群,我們都涉及到了許多配置項,大家通過上述的搭建示例也能夠了解到每個配置項的意義,這里就專門來著重介紹一下,Kafka中一些重要的配置項。
(1) listeners
這個配置項用于指定Kafka服務器監聽客戶端連接的地址和端口,當 Kafka 服務器啟動時,它將監聽listeners配置項中指定的地址和端口,等待客戶端的連接請求。
一般情況下這個配置以PLAINTEXT://或者CONTROLLER://開頭,意義如下:
- 若這個節點是Broker節點,則以PLAINTEXT://開頭
- 若這個節點是Controller節點,則以CONTROLLER://開頭
- 若這個節點是混合節點,則需要同時配置兩者開頭的地址
這個配置項通常不需要修改,下面給出幾個配置示例
- PLAINTEXT://:9092 本節點作為Broker節點,監聽本機所有可用網卡的9092端口(使用9092端口作為客戶端通信端口),也是默認配置
- PLAINTEXT://127.0.0.1:9092 本節點作為Broker節點,監聽本地的9092端口,這樣僅接受來自本地的請求
- CONTROLLER://:10000 本節點作為Controller節點,監聽本機所有可用網卡的10000端口(使用10000端口作為控制器端口)
- PLAINTEXT://:9092,CONTROLLER://:9093 本節點作為混合節點,監聽本機所有可用網卡的9092和9093端口,其中9092作為客戶端通信端口,9093作為控制器端口
(2) advertise.listeners
這個配置容易和listeners混淆,事實上它們是有較大的區別的。
該配置項指定Kafka服務器廣播給客戶端的地址和端口,通常配置為Kafka所在服務器的外網地址。
當客戶端(生產者或消費者)嘗試連接到Kafka服務器時,它首先會獲取Kafka服務器廣播的地址和端口,也就是advertise.listeners配置所指定的地址和端口,然后才會使用advertise.listeners配置所指定的地址和端口來建立與Kafka服務器的連接。
相信這時大家會有個疑問:既然客戶端要連接Kafka(例如Spring Boot集成Kafka客戶端),那一定是已經知道了Kafka對外的地址端口了,那為什么連接的時候還需要獲取一下廣播的地址端口再進行連接呢?這樣是不是有一些多此一舉?
事實上,Kafka設計這個配置是為了解決下面較為復雜的網絡場景:
- 多網絡接口的主機部署:在一個多網絡接口的主機部署Kafka時,Kafka服務器可能會監聽多個地址和端口,這些地址和端口可能與客戶端實際訪問的地址和端口不同,advertise.listeners允許服務器指定一個公開的、可訪問的地址和端口,以便客戶端能夠正確連接
- NAT/代理環境:在某些網絡環境下,Kafka服務器位于一個私有網絡中,客戶端位于一個公共網絡中,兩者之間可能存在網絡地址轉換(NAT)或代理,在這種情況下,Kafka服務器的內部地址和端口對客戶端來說是不可訪問的。通過使用advertise.listeners,Kafka服務器可以將一個公共地址和端口廣播給客戶端,使得客戶端能夠通過公共網絡連接到服務器
- 容器環境:例如你把Kafka放在Docker容器中運行,按照默認配置,Kafka服務端只會監聽容器網絡的9092端口,我們知道外部不能直接訪問容器的網絡,而是需要使用網絡映射,假設你把Kafka容器的9092端口映射至了宿主機9095端口,也就是說外部需要通過9095端口訪問到Kafka容器的9092端口,那么你就配置advertise.listeners為PLAINTEXT://服務器外網地址:9095,客戶端就可以正確訪問容器中的Kafka了
總之,這個配置設置為Kafka服務器所在的外網地址即可!例如PLAINTEXT://69.54.112.239:9092。
(3) process.roles
這是KRaft模式下專門的配置,用于配置這個節點的類型,可以配置為下列值:
- broker 表示這個節點是Broker節點,充當消息隊列的角色
- controller 表示這個節點是Controller節點,充當元數據存放和管理的角色
- broker,controller 表示這個節點同時擔任Broker和Controller的角色,也稱作混合節點
如果沒有配置這個選項,則Kafka會以Zookeeper模式運行。
這里有下列注意事項:
- 如果設定節點為controller:
則不能配置advertised.listeners,可以將其注釋掉或者刪掉
listeners需要配置為CONTROLLER://開頭,建議配置為CONTROLLER://:9093
- 如果設定節點為broker:
則需要配置advertised.listeners為服務器外網地址和端口,這和Zookeeper模式中相同
listeners需要配置為PLAINTEXT://開頭,建議配置為PLAINTEXT://:9092
- 如果設定節點為混合節點:
同樣需要配置advertised.listeners為服務器外網地址和端口
listeners需要同時配置CONTROLLER://和PLAINTEXT://,建議配置為PLAINTEXT://:9092,CONTROLLER://:9093
在開發環境或者小規模集群,可以全部使用混合節點,如果是生產環境就建議設定好每個節點的類型了!并且通常需要先啟動Controller節點再啟動Broker節點。
事實上,我們發現Kafka的KRaft配置目錄config/kraft下有三個配置文件,其中server.properties是混合節點的配置模板,而broker.properties和controller.properties分別是Broker節點和Controller節點的配置模板,大家如果要設定節點類型,可以直接使用對應的配置文件,將對應配置文件需要修改的部分修改一下,然后將上述格式化數據目錄命令和啟動命令中的配置文件路徑改變一下即可,這樣可以省略我們設定process.roles和listeners或者控制器節點刪除advertise.listeners配置的操作。
(4) controller.quorum.voters
該配置項用于配置集群中Controller節點選舉過程中的投票者,集群中所有的Controller節點都需要被羅列在這個配置項中,其配置格式為id1@host1:port1,id2@host2:port2,id3@host3:port3...。
有的同學可能認為這里需要把集群中所有節點都寫進去,事實上這是錯誤的,這里只需要寫所有的Controller節點和混合節點的id、地址和端口即可,這個配置中配置的端口當然是控制器端口。
上述集群搭建的例子中,由于所有的節點都是混合節點,因此就全部寫在其中了!如果我們手動設定每個節點的類型,例如:
那么所有節點的controller.quorum.voters都需要配置為1@kafka1:9093。
事實上,所有的節點都是通過這個配置中的節點列表,來得知所有的控制器節點信息(以獲取集群元數據)并得到投票候選者的,因此集群中所有節點,不論是Broker還是Controller,還是混合節點,都需要配置這一項。
(5) 其它配置
除了上述我們涉及到的一些配置之外,還有下列配置大家可以進行修改:
- socket.send.buffer.bytes 每次發送的數據包的最大大小(單位:字節)
- socket.receive.buffer.bytes 每次接收的數據包的最大大小(單位:字節)
- socket.request.max.bytes 接收的最大請求大小(單位:字節)
- num.partitions 每個Topic的默認分區數
上述無論是哪個模式的集群,都可以在配置文件中找到這些配置,如果找不到可手動加入。除了修改配置文件之外,我們還可以在啟動Kafka的命令中指定配置和值,例如:
bash
bin/kafka-server-start.sh config/server.properties --override zookeeper.connect=127.0.0.1:2181 --override broker.id=1
上述命令在啟動時通過命令指定了zookeeper.connect配置值為127.0.0.1:2181,以及broker.id為1,可見在后面追加--override 配置名=值即可,注意命令行中指定的配置值會覆蓋掉配置文件中的配置值!