ZK(zookeeper)是微服務解決方案中擁有服務注冊發現最為核心的環境,是微服務的基石。作為服務注冊發現模塊,并不是只有ZK一種產品,目前得到行業認可的還有:Eureka、Consul。這里我們只聊ZK,這個工具本身很小zip包就幾兆,安裝非常傻瓜,能夠支持集群部署。
背景
在集群環境下ZK的leader&follower的概念,已經節點異常ZK面臨的問題以及如何解決。ZK本身是JAVA語言開發,也開源到Github上但官方文檔對內部介紹的很少,零散的博客很多,有些寫的很不錯。
ZK節點狀態角色
ZK集群單節點狀態(每個節點有且只有一個狀態),ZK的定位一定需要一個leader節點處于lading狀態。
- looking:尋找leader狀態,當前集群沒有leader,進入leader選舉流程。
- following:跟隨者狀態,接受leading節點同步和指揮。
- leading:領導者狀態。
- observing:觀察者狀態,表明當前服務器是observer。
ZAB協議(原子廣播)
Zookeeper專門設計了一種名為原子廣播(ZAB)的支持崩潰恢復的一致性協議。ZK實現了一種主從模式的系統架構來保持集群中各個副本之間的數據一致性,所有的寫操作都必須通過Leader完成,Leader寫入本地日志后再復制到所有的Follower節點。一旦Leader節點無法工作,ZAB協議能夠自動從Follower節點中重新選出一個合適的替代者,即新的Leader,該過程即為領導選舉。
ZK集群中事務處理是leader負責,follower會轉發到leader來統一處理。簡單理解就是ZK的寫統一leader來做,讀可以follower處理,這也就是CAP理論中ZK更適合讀多寫少的服務。
過半選舉算法
ZK投票處理策略
投票信息包含 :所選舉leader的Serverid,Zxid,SelectionEpoch
- Epoch判斷,自身logicEpoch與SelectionEpoch判斷:大于、小于、等于。
- 優先檢查ZXID。ZXID比較大的服務器優先作為Leader。
- 如果ZXID相同,那么就比較myid。myid較大的服務器作為Leader服務器。
ZK中有三種選舉算法,分別是LeaderElection,FastLeaderElection,AuthLeaderElection,FastLeaderElection和AuthLeaderElection是類似的選舉算法,唯一區別是后者加入了認證信息, FastLeaderElection比LeaderElection更高效,后續的版本只保留FastLeaderElection。
理解:
在集群環境下多個節點啟動,ZK首先需要在多個節點中選出一個節點作為leader并處于Leading狀態,這樣就面臨一個選舉問題,同時選舉規則是什么樣的。“過半選舉算法”:投票選舉中獲得票數過半的節點勝出,即狀態從looking變為leading,效率更高。
官網資料描述:Clustered (Multi-Server) Setup,如下圖:
以5臺服務器講解思路:
- 服務器1啟動,此時只有它一臺服務器啟動了,它發出去的Vote沒有任何響應,所以它的選舉狀態一直是LOOKING狀態;
- 服務器2啟動,它與最開始啟動的服務器1進行通信,互相交換自己的選舉結果,由于兩者都沒有歷史數據,所以id值較大的服務器2勝出,但是由于沒有達到超過半數以上的服務器都同意選舉它(這個例子中的半數以上是3),所以服務器1,2還是繼續保持LOOKING狀態.
- 服務器3啟動,根據前面的理論,分析有三臺服務器選舉了它,服務器3成為服務器1,2,3中的老大,所以它成為了這次選舉的leader.
- 服務器4啟動,根據前面的分析,理論上服務器4應該是服務器1,2,3,4中最大的,但是由于前面已經有半數以上的服務器選舉了服務器3,所以它只能接收當小弟的命了.
- 服務器5啟動,同4一樣,當小弟.
假設5臺中掛了2臺(3、4),其中leader也掛掉:
leader和follower間有檢查心跳,需要同步數據 Leader節點掛了,整個Zookeeper集群將暫停對外服務,進入新一輪Leader選舉
1)服務器1、2、5發現與leader失聯,狀態轉為looking,開始新的投票
2)服務器1、2、5分別開始投票并廣播投票信息,自身Epoch自增;
3) 服務器1、2、5分別處理投票,判斷出leader分別廣播
4)根據投票處理邏輯會選出一臺(2票過半)
5)各自服務器重新變更為leader、follower狀態
6)重新提供服務
源碼解析:
/** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. */public Vote lookForLeader() throws InterruptedException {try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } self.start_fle = Time.currentElapsedTime();try { Map<Long, Vote> recvset = new HashMap<Long, Vote>(); Map<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = minNotificationInterval; synchronized (this) { logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); SyncedLearnerTracker voteSet;/* * Loop in which we exchange notifications until we find a leader */while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {/* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */if (n == null) {if (manager.haveDelivered()) { sendNotifications(); } else { manager.connectAll(); }/* * Exponential backoff */ int tmpTimeOut = notTimeout * 2; notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) {/* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING:if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1");break; }if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break; }// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear();if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) {if (LOG.isDebugEnabled()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); }break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); }if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); }// don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));if (voteSet.hasAllQuorums()) {// Verify if there is any change in the proposed leaderwhile ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n);break; } }/* * This predicate is true once we don't read any new * relevant message from the reception queue */if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote);return endVote; } }break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid);break; case FOLLOWING: case LEADING:/* * Consider all notifications from the same epoch * together. */if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote);return endVote; } }/* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote);return endVote; }break;default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");break; } } else {if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); }if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } }return null; } finally {try {if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); }}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same* as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
腦裂問題
腦裂問題出現在集群中leader死掉,follower選出了新leader而原leader又復活了的情況下,因為ZK的過半機制是允許損失一定數量的機器而扔能正常提供給服務,當leader死亡判斷不一致時就會出現多個leader。
方案:
ZK的過半機制一定程度上也減少了腦裂情況的出現,起碼不會出現三個leader同時。ZK中的Epoch機制(時鐘)每次選舉都是遞增+1,當通信時需要判斷epoch是否一致,小于自己的則拋棄,大于自己則重置自己,等于則選舉;
// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear();if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {if (LOG.isDebugEnabled()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); }break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications();}
歸納
在日常的ZK運維時需要注意以上場景在極端情況下出現問題,特別是腦裂的出現,可以采用:
過半選舉策略下部署原則:
- 服務器群部署要單數,如:3、5、7、...,單數是最容易選出leader的配置量。
- ZK允許節點最大損失數,原則就是“保證過半選舉正常”,多了就是浪費。
詳細的算法邏輯是很復雜要考慮很多情況,其中有個Epoch的概念(自增長),分為:LogicEpoch和ElectionEpoch,每次投票都有判斷每個投票周期是否一致等等。在思考ZK策略時經常遇到這樣的問題(上文中兩塊),梳理了一下思路以便于理解也作為后續回顧。
作者:owen_jia(開源中國博客)
公眾號:互聯網技術到家
頭條號:互聯網技術到家