本文作者:何建輝(公眾號:org_yijiaoqian)
本文 GitHub org_hejianhui/JAVAStudy 已收錄,有我的系列文章。
前言
- Zookeeper特性與節點說明
- Zookeeper客戶端使用與集群原理
- Zookeeper典型使用場景實踐
前三篇講了Zookeeper的特性、客戶端使用和集群原理、典型使用場景實踐,本篇重點深入了解ZAB協議以及源碼實現的解析。
Zookeeper ZAB協議
什么是Zab協議?
Zab協議的全稱是 Zookeeper Atomic Broadcast (Zookeeper原子廣播)。
Zookeeper 是通過 Zab 協議來保證分布式事務的最終一致性。
- Zab協議是為分布式協調服務Zookeeper專門設計的一種 支持崩潰恢復 的 原子廣播協議 ,是Zookeeper保證數據一致性的核心算法。Zab借鑒了Paxos算法,但又不像Paxos那樣,是一種通用的分布式一致性算法。它是特別為Zookeeper設計的支持崩潰恢復的原子廣播協議。
- 在Zookeeper中主要依賴Zab協議來實現數據一致性,基于該協議,zk實現了一種主備模型(即Leader和Follower模型)的系統架構來保證集群中各個副本之間數據的一致性。這里的主備系統架構模型,就是指只有一臺客戶端(Leader)負責處理外部的寫事務請求,然后Leader客戶端將數據同步到其他Follower節點。
Zookeeper 客戶端會隨機的鏈接到 zookeeper 集群中的一個節點,如果是讀請求,就直接從當前節點中讀取數據;如果是寫請求,那么節點就會向 Leader 提交事務,Leader 接收到事務提交,會廣播該事務,只要超過半數節點寫入成功,該事務就會被提交。
Zab 協議實現的作用
- 使用一個單一的主進程(Leader)來接收并處理客戶端的事務請求(也就是寫請求),并采用了Zab的原子廣播協議,將服務器數據的狀態變更以 事務proposal (事務提議)的形式廣播到所有的副本(Follower)進程上去。
- 保證一個全局的變更序列被順序引用
Zookeeper是一個樹形結構,很多操作都要先檢查才能確定是否可以執行,比如P1的事務t1可能是創建節點"/a",t2可能是創建節點"/a/bb",只有先創建了父節點"/a",才能創建子節點"/a/b"。
為了保證這一點,Zab要保證同一個Leader發起的事務要按順序被Apply,同時還要保證只有先前Leader的事務被apply之后,新選舉出來的Leader才能再次發起事務。
- 當主進程出現異常的時候,整個zk集群依舊能正常工作。
Zab協議原理
Zab協議要求每個 Leader 都要經歷三個階段:發現,同步,廣播。
- 發現:要求zookeeper集群必須選舉出一個 Leader 進程,同時 Leader 會維護一個 Follower 可用客戶端列表。將來客戶端可以和這些 Follower節點進行通信。
- 同步:Leader 要負責將本身的數據與 Follower 完成同步,做到多副本存儲。這樣也是體現了CAP中的高可用和分區容錯。Follower將隊列中未處理完的請求消費完成后,寫入本地事務日志中。
- 廣播:Leader 可以接收客戶端新的事務Proposal請求,將新的Proposal請求廣播給所有的 Follower。
Zab協議核心
Zab協議的核心:定義了事務請求的處理方式
- 所有的事務請求必須由一個全局唯一的服務器來協調處理,這樣的服務器被叫做 Leader服務器。其他剩余的服務器則是 Follower服務器。
- Leader服務器 負責將一個客戶端事務請求,轉換成一個 事務Proposal,并將該 Proposal 分發給集群中所有的 Follower 服務器,也就是向所有 Follower 節點發送數據廣播請求(或數據復制)
- 分發之后Leader服務器需要等待所有Follower服務器的反饋(Ack請求),在Zab協議中,只要超過半數的Follower服務器進行了正確的反饋后(也就是收到半數以上的Follower的Ack請求),那么 Leader 就會再次向所有的 Follower服務器發送 Commit 消息,要求其將上一個 事務proposal 進行提交。
Zab協議內容
Zab 協議包括兩種基本的模式:崩潰恢復 和 消息廣播
協議過程
當整個集群啟動過程中,或者當 Leader 服務器出現網絡中弄斷、崩潰退出或重啟等異常時,Zab協議就會 進入崩潰恢復模式,選舉產生新的Leader。
當選舉產生了新的 Leader,同時集群中有過半的機器與該 Leader 服務器完成了狀態同步(即數據同步)之后,Zab協議就會退出崩潰恢復模式,進入消息廣播模式。
這時,如果有一臺遵守Zab協議的服務器加入集群,因為此時集群中已經存在一個Leader服務器在廣播消息,那么該新加入的服務器自動進入恢復模式:找到Leader服務器,并且完成數據同步。同步完成后,作為新的Follower一起參與到消息廣播流程中。
協議狀態切換
當Leader出現崩潰退出或者機器重啟,亦或是集群中不存在超過半數的服務器與Leader保存正常通信,Zab就會再一次進入崩潰恢復,發起新一輪Leader選舉并實現數據同步。同步完成后又會進入消息廣播模式,接收事務請求。
保證消息有序
在整個消息廣播中,Leader會將每一個事務請求轉換成對應的 proposal 來進行廣播,并且在廣播 事務Proposal 之前,Leader服務器會首先為這個事務Proposal分配一個全局單遞增的唯一ID,稱之為事務ID(即zxid),由于Zab協議需要保證每一個消息的嚴格的順序關系,因此必須將每一個proposal按照其zxid的先后順序進行排序和處理。
消息廣播
- 在zookeeper集群中,數據副本的傳遞策略就是采用消息廣播模式。zookeeper中數據副本的同步方式與二段提交相似,但是卻又不同。二段提交要求協調者必須等到所有的參與者全部反饋ACK確認消息后,再發送commit消息。要求所有的參與者要么全部成功,要么全部失敗。二段提交會產生嚴重的阻塞問題。
- Zab協議中 Leader 等待 Follower 的ACK反饋消息是指“只要半數以上的Follower成功反饋即可,不需要收到全部Follower反饋”
消息廣播具體步驟
- 客戶端發起一個寫操作請求。
- Leader 服務器將客戶端的請求轉化為事務 Proposal 提案,同時為每個 Proposal 分配一個全局的ID,即zxid。
- Leader 服務器為每個 Follower 服務器分配一個單獨的隊列,然后將需要廣播的 Proposal 依次放到隊列中去,并且根據 FIFO 策略進行消息發送。
- Follower 接收到 Proposal 后,會首先將其以事務日志的方式寫入本地磁盤中,寫入成功后向 Leader 反饋一個 Ack 響應消息。
- Leader 接收到超過半數以上 Follower 的 Ack 響應消息后,即認為消息發送成功,可以發送 commit 消息。
- Leader 向所有 Follower 廣播 commit 消息,同時自身也會完成事務提交。Follower 接收到 commit 消息后,會將上一條事務提交。
zookeeper 采用 Zab 協議的核心,就是只要有一臺服務器提交了 Proposal,就要確保所有的服務器最終都能正確提交 Proposal。這也是 CAP/BASE 實現最終一致性的一個體現。
Leader 服務器與每一個 Follower 服務器之間都維護了一個單獨的 FIFO 消息隊列進行收發消息,使用隊列消息可以做到異步解耦。 Leader 和 Follower 之間只需要往隊列中發消息即可。如果使用同步的方式會引起阻塞,性能要下降很多。
崩潰恢復
一旦 Leader 服務器出現崩潰或者由于網絡原因導致 Leader 服務器失去了與過半 Follower 的聯系,那么就會進入崩潰恢復模式。
在 Zab 協議中,為了保證程序的正確運行,整個恢復過程結束后需要選舉出一個新的 Leader 服務器。因此 Zab 協議需要一個高效且可靠的 Leader 選舉算法,從而確保能夠快速選舉出新的 Leader 。
Leader 選舉算法不僅僅需要讓 Leader 自己知道自己已經被選舉為 Leader ,同時還需要讓集群中的所有其他機器也能夠快速感知到選舉產生的新 Leader 服務器。
崩潰恢復主要包括兩部分:Leader選舉 和 數據恢復
Zab 協議如何保證數據一致性
假設兩種異常情況:
- 一個事務在 Leader 上提交了,并且過半的 Folower 都響應 Ack 了,但是 Leader 在 Commit 消息發出之前掛了。
- 假設一個事務在 Leader 提出之后,Leader 掛了。
要確保如果發生上述兩種情況,數據還能保持一致性,那么 Zab 協議選舉算法必須滿足以下要求:
Zab 協議崩潰恢復要求滿足以下兩個要求:
- 確保已經被 Leader 提交的 Proposal 必須最終被所有的 Follower 服務器提交。
- 確保丟棄已經被 Leader 提出的但是沒有被提交的 Proposal。
根據上述要求 Zab協議需要保證選舉出來的Leader需要滿足以下條件:
- 新選舉出來的 Leader 不能包含未提交的 Proposal 。
即新選舉的 Leader 必須都是已經提交了 Proposal 的 Follower 服務器節點。
- 新選舉的 Leader 節點中含有最大的 zxid 。
這樣做的好處是可以避免 Leader 服務器檢查 Proposal 的提交和丟棄工作。
Zab 如何數據同步
- 完成 Leader 選舉后(新的 Leader 具有最高的zxid),在正式開始工作之前(接收事務請求,然后提出新的 Proposal),Leader 服務器會首先確認事務日志中的所有的 Proposal 是否已經被集群中過半的服務器 Commit。
- Leader 服務器需要確保所有的 Follower 服務器能夠接收到每一條事務的 Proposal ,并且能將所有已經提交的事務 Proposal 應用到內存數據中。等到 Follower 將所有尚未同步的事務 Proposal 都從 Leader 服務器上同步過啦并且應用到內存數據中以后,Leader 才會把該 Follower 加入到真正可用的 Follower 列表中。
Zab 數據同步過程中,如何處理需要丟棄的 Proposal
在 Zab 的事務編號 zxid 設計中,zxid是一個64位的數字。
其中低32位可以看成一個簡單的單增計數器,針對客戶端每一個事務請求,Leader 在產生新的 Proposal 事務時,都會對該計數器加1。而高32位則代表了 Leader 周期的 epoch 編號。
epoch 編號可以理解為當前集群所處的年代,或者周期。每次Leader變更之后都會在 epoch 的基礎上加1,這樣舊的 Leader 崩潰恢復之后,其他Follower 也不會聽它的了,因為 Follower 只服從epoch最高的 Leader 命令。
每當選舉產生一個新的 Leader ,就會從這個 Leader 服務器上取出本地事務日志最大編號 Proposal 的 zxid,并從 zxid 中解析得到對應的 epoch 編號,然后再對其加1,之后該編號就作為新的 epoch 值,并將低32位數字歸零,由0開始重新生成zxid。
Zab 協議通過 epoch 編號來區分 Leader 變化周期,能夠有效避免不同的 Leader 錯誤的使用了相同的 zxid 編號提出了不一樣的 Proposal 的異常情況。
基于以上策略: 當一個包含了上一個 Leader 周期中尚未提交過的事務 Proposal 的服務器啟動時,當這臺機器加入集群中,以 Follower 角色連上 Leader 服務器后,Leader 服務器會根據自己服務器上最后提交的 Proposal 來和 Follower 服務器的 Proposal 進行比對,比對的結果肯定是 Leader 要求 Follower 進行一個回退操作,回退到一個確實已經被集群中過半機器 Commit 的最新 Proposal。
Zab實現原理
Zab 節點有三種狀態
- Following:當前節點是跟隨者,服從 Leader 節點的命令。
- Leading:當前節點是 Leader,負責協調事務。
- Election/Looking:節點處于選舉狀態,正在尋找 Leader。
代碼實現中,多了一種狀態:Observing 狀態 這是 Zookeeper 引入 Observer 之后加入的,Observer 不參與選舉,是只讀節點,跟 Zab 協議沒有關系。
節點的持久狀態
- history:當前節點接收到事務 Proposal 的Log
- acceptedEpoch:Follower 已經接收的 Leader 更改 epoch 的 newEpoch 提議。
- currentEpoch:當前所處的 Leader 年代
- lastZxid:history 中最近接收到的Proposal 的 zxid(最大zxid)
Zab 的四個階段
- 選舉階段(Leader Election)
節點在一開始都處于選舉節點,只要有一個節點得到超過半數節點的票數,它就可以當選準 Leader,只有到達第三個階段(也就是同步階段),這個準 Leader 才會成為真正的 Leader。
Zookeeper 規定所有有效的投票都必須在同一個 輪次 中,每個服務器在開始新一輪投票時,都會對自己維護的 logicalClock 進行自增操作。
每個服務器在廣播自己的選票前,會將自己的投票箱(recvset)清空。該投票箱記錄了所收到的選票。
例如:Server_2 投票給 Server_3,Server_3 投票給 Server_1,則Server_1的投票箱為(2,3)、(3,1)、(1,1)。(每個服務器都會默認給自己投票)
前一個數字表示投票者,后一個數字表示被選舉者。票箱中只會記錄每一個投票者的最后一次投票記錄,如果投票者更新自己的選票,則其他服務器收到該新選票后會在自己的票箱中更新該服務器的選票。
這一階段的目的就是為了選出一個準 Leader ,然后進入下一個階段。
協議并沒有規定詳細的選舉算法,后面會提到實現中使用的 Fast Leader Election。
- 發現階段(Descovery)
在這個階段,Followers 和上一輪選舉出的準 Leader 進行通信,同步 Followers 最近接收的事務 Proposal 。
一個 Follower 只會連接一個 Leader,如果一個 Follower 節點認為另一個 Follower 節點,則會在嘗試連接時被拒絕。被拒絕之后,該節點就會進入 Leader Election階段。
這個階段的主要目的是發現當前大多數節點接收的最新 Proposal,并且準 Leader 生成新的 epoch ,讓 Followers 接收,更新它們的 acceptedEpoch。
- 同步階段(Synchronization)
同步階段主要是利用 Leader 前一階段獲得的最新 Proposal 歷史,同步集群中所有的副本。
只有當 quorum(超過半數的節點) 都同步完成,準 Leader 才會成為真正的 Leader。Follower 只會接受 zxid 比自己 lastZxid 大的 Proposal。
- 廣播階段(Broadcast)
到了這個階段,Zookeeper 集群才能正式對外提供事務服務,并且 Leader 可以進行消息廣播。同時,如果有新的節點加入,還需要對新節點進行同步。
需要注意的是,Zab 提交事務并不像 2PC 一樣需要全部 Follower 都 Ack,只需要得到 quorum(超過半數的節點)的Ack 就可以。
Zab協議實現
協議的 Java 版本實現跟上面的定義略有不同,選舉階段使用的是 Fast Leader Election(FLE),它包含了步驟1的發現職責。因為FLE會選舉擁有最新提議的歷史節點作為 Leader,這樣就省去了發現最新提議的步驟。
實際的實現將發現和同步階段合并為 Recovery Phase(恢復階段),所以,Zab 的實現實際上有三個階段。
Zab協議三個階段:
- 選舉(Fast Leader Election)
- 恢復(Recovery Phase)
- 廣播(Broadcast Phase)
Fast Leader Election(快速選舉)
前面提到的 FLE 會選舉擁有最新Proposal history (lastZxid最大)的節點作為 Leader,這樣就省去了發現最新提議的步驟。這是基于擁有最新提議的節點也擁有最新的提交記錄
成為 Leader 的條件:
- 選 epoch 最大的
- 若 epoch 相等,選 zxid 最大的
- 若 epoch 和 zxid 相等,選擇 server_id 最大的(zoo.cfg中的myid)
節點在選舉開始時,都默認投票給自己,當接收其他節點的選票時,會根據上面的 Leader條件 判斷并且更改自己的選票,然后重新發送選票給其他節點。當有一個節點的得票超過半數,該節點會設置自己的狀態為 Leading ,其他節點會設置自己的狀態為 Following。
Recovery Phase(恢復階段)
這一階段 Follower 發送他們的 lastZxid 給 Leader,Leader 根據 lastZxid 決定如何同步數據。這里的實現跟前面的 Phase 2 有所不同:Follower 收到 TRUNC 指令會終止 L.lastCommitedZxid 之后的 Proposal ,收到 DIFF 指令會接收新的 Proposal。
history.lastCommitedZxid:最近被提交的 Proposal zxid history.oldThreshold:被認為已經太舊的已經提交的 Proposal zxid
Zookeeper ZAB協議實現源碼
啟動流程
知識點:
- 工程結構介紹
- 啟動流程宏觀圖
- 集群啟動詳細流程
- netty 服務工作機制
工程結構介紹
項目地址:https://github.com/Apache/zookeeper.git
分支tag :3.6.2
- zookeeper-recipes: 示例源碼
- zookeeper-client: C語言客戶端
- zookeeper-server:主體源碼(包含客戶端)
啟動宏觀流程圖
源碼啟動:
- 服務端:ZooKeeperServerMain
- 客戶端:ZooKeeperMain
為方便閱讀,以下代碼均省略包名
集群啟動詳細流程
裝載配置:
# zookeeper 啟動流程堆棧
>QuorumPeerMain#initializeAndRun //啟動工程
>QuorumPeerConfig#parse // 加載config 配置
>QuorumPeerConfig#parseProperties// 解析config配置
>new DatadirCleanupManager // 構造一個數據清器
>DatadirCleanupManager#start // 啟動定時任務 清除過期的快照
代碼堆棧:
>QuorumPeerMain#main //啟動main方法
>QuorumPeerConfig#parse // 加載zoo.cfg 文件
>QuorumPeerConfig#parseProperties // 解析配置
>DatadirCleanupManager#start // 啟動定時任務清除日志
>QuorumPeerConfig#isDistributed // 判斷是否為集群模式
>ServerCnxnFactory#createFactory() // 創建服務默認為NIO,推薦netty
//***創建 初始化集群管理器**/
>QuorumPeerMain#getQuorumPeer
>QuorumPeer#setTxnFactory
>new FileTxnSnapLog // 數據文件管理器,用于檢測快照與日志文件
/** 初始化數據庫*/
>new ZKDatabase
>ZKDatabase#createDataTree //創建數據樹,所有的節點都會存儲在這
// 啟動集群:同時啟動線程
> QuorumPeer#start //
> QuorumPeer#loadDataBase // 從快照文件以及日志文件 加載節點并填充到dataTree中去
> QuorumPeer#startServerCnxnFactory // 啟動netty 或java nio 服務,對外開放2181 端口
> AdminServer#start// 啟動管理服務,netty http服務,默認端口是8080
> QuorumPeer#startLeaderElection // 開始執行選舉流程
> quorumPeer.join() // 防止主進程退出
流程說明:
- main方法啟動
- 加載zoo.cfg 配置文件
- 解析配置
- 創建服務工廠
- 創建集群管理線程
- 設置數據庫文件管理器
- 設置數據庫
- ....設置設置
- start啟動集群管理線程
- 加載數據節點至內存
- 啟動netty 服務,對客戶端開放端口
- 啟動管理員Http服務,默認8080端口
- 啟動選舉流程
- join 管理線程,防止main 進程退出
netty 服務啟動流程
服務UML類圖
設置netty啟動參數
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
初始化:
關鍵代碼:
#初始化管道流
#channelHandler 是一個內部類是具體的消息處理器。protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); if (secure) {
initSSL(pipeline); } pipeline.addLast("servercnxnfactory", channelHandler);
}
channelHandler 類結構
執行堆棧:
NettyServerCnxnFactory#NettyServerCnxnFactory // 初始化netty服務工廠
> NettyUtils.newNioOrEpollEventLoopGroup // 創建IO線程組
> NettyUtils#newNioOrEpollEventLoopGroup() // 創建工作線程組
>ServerBootstrap#childHandler(io.netty.channel.ChannelHandler) // 添加管道流
>NettyServerCnxnFactory#start // 綁定端口,并啟動netty服務
創建連接:
每當有客戶端新連接進來,就會進入該方法 創建 NettyServerCnxn對象。并添加至cnxns隊列
執行堆棧
CnxnChannelHandler#channelActive
>new NettyServerCnxn // 構建連接器
>NettyServerCnxnFactory#addCnxn // 添加至連接器,并根據客戶端IP進行分組
>ipMap.get(addr) // 基于IP進行分組
讀取消息:
執行堆棧
CnxnChannelHandler#channelRead
>NettyServerCnxn#processMessage // 處理消息
>NettyServerCnxn#receiveMessage // 接收消息
>ZooKeeperServer#processPacket //處理消息包
>org.apache.zookeeper.server.Request // 封裝request 對象
>org.apache.zookeeper.server.ZooKeeperServer#submitRequest // 提交request
>org.apache.zookeeper.server.RequestProcessor#processRequest // 處理請求
快照與事務日志存儲結構
概要
ZK中所有的數據都是存儲在內存中,即zkDataBase中。但同時所有對ZK數據的變更都會記錄到事務日志中,并且當寫入到一定的次數就會進行一次快照的生成。已保證數據的備份。其后綴就是ZXID(唯一事務ID)。
- 事務日志:每次增刪改,的記錄日志都會保存在文件當中
- 快照日志:存儲了在指定時間節點下的所有的數據
存儲結構
zkDdataBase 是zk數據庫基類,所有節點都會保存在該類當中,而對Zk進行任何的數據變更都會基于該類進行。zk數據的存儲是通過DataTree 對象進行,其用了一個map 來進行存儲。
UML 類圖:
讀取快照日志:
org.apache.zookeeper.server.SnapshotFormatter
讀取事務日志:
org.apache.zookeeper.server.LogFormatter
快照相關配置
快照裝載流程
>ZooKeeperServer#loadData // 加載數據
>FileTxnSnapLog#restore // 恢復數據
>FileSnap#deserialize() // 反序列化數據
>FileSnap#findNValidSnapshots // 查找有效的快照
>Util#sortDataDir // 基于后綴排序文件