zookeeper簡(jiǎn)介
zookeeper是為分布式應(yīng)用提供分布式協(xié)作服務(wù)的開(kāi)源軟件。它提供了一組簡(jiǎn)單的原子操作,分布式應(yīng)用可以基于這些原子操作來(lái)實(shí)現(xiàn)更高層次的同步服務(wù),配置維護(hù),組管理和命名。zookeeper的設(shè)計(jì)使基于它的編程非常容易,若我們熟悉目錄樹(shù)結(jié)構(gòu)的文件系統(tǒng),也會(huì)很容易使用zookeeper的數(shù)據(jù)模型樣式。它運(yùn)行在JAVA上,有java和c的客戶(hù)端。
協(xié)作服務(wù)因難于獲取正確而臭名遠(yuǎn)揚(yáng),他們特別易于出錯(cuò)如競(jìng)爭(zhēng)條件和死鎖。zookeeper的動(dòng)機(jī)是減輕分布式應(yīng)用中從零開(kāi)始實(shí)現(xiàn)協(xié)作服務(wù)的壓力。
zookeeper的特點(diǎn)
1.簡(jiǎn)單:zookeeper運(yùn)行分布式進(jìn)行通過(guò)一個(gè)共享的層次命名空間來(lái)進(jìn)行協(xié)作,該命名空間的組織類(lèi)似于標(biāo)準(zhǔn)的文件系統(tǒng)。命名空間包括數(shù)據(jù)注冊(cè)器(稱(chēng)之為znode),在zookeeper看來(lái),這類(lèi)似于文件和目錄。與典型的文件系統(tǒng)設(shè)計(jì)用來(lái)存儲(chǔ)不同的是,zookeeper數(shù)據(jù)是存放在內(nèi)存中,這意味著zookeeper可以實(shí)現(xiàn)很高的吞吐量和低延遲。
ZooKeeper 實(shí)現(xiàn)在高性能,高可用性,嚴(yán)格有序的訪(fǎng)問(wèn)方面有很大的優(yōu)勢(shì)。在性能方面的優(yōu)勢(shì)使它可以應(yīng)用在大型的的分布式系統(tǒng)。在可靠性方面,避免單點(diǎn)故障。嚴(yán)格的順序訪(fǎng)問(wèn)使它在客戶(hù)端可以實(shí)現(xiàn)復(fù)雜的同步原語(yǔ)。
2. 可復(fù)制:類(lèi)似于分布式進(jìn)程的協(xié)作,zookeeper本身很容易在一組主機(jī)(稱(chēng)之為集合)中實(shí)現(xiàn)復(fù)制。zookeeper服務(wù)示意圖:
組成ZooKeeper服務(wù)的一組服務(wù)器都必須知道對(duì)方的。它們保存了內(nèi)存映像的狀態(tài),以及在持久存儲(chǔ)中的事務(wù)日志和快照。只要大部分的服務(wù)器可用,ZooKeeper服務(wù)將可用。
客戶(hù)端連接到一臺(tái)ZooKeeper服務(wù)器。客戶(hù)端維護(hù)一個(gè)TCP連接,通過(guò)它發(fā)送請(qǐng)求,得到響應(yīng),得到監(jiān)視事件,并發(fā)送心跳。如果TCP連接到服務(wù)器中斷,客戶(hù)端可以連接到不同的服務(wù)器。
3. 有序:ZooKeeper給每次更新使用數(shù)字打標(biāo)記,它反映了所有zookeeper事務(wù)的順序。隨后的操作可以使用這些順序來(lái)實(shí)現(xiàn)更高級(jí)別的抽象,如同步原語(yǔ)。
4.快速:它特別快,在“讀為主”的工作中,ZooKeeper 應(yīng)用程序運(yùn)行在數(shù)千臺(tái)機(jī)器,它在讀遠(yuǎn)比寫(xiě)更多的時(shí)候(在10:1的比例)表現(xiàn)的最好。
數(shù)據(jù)模型與層次命名空間
ZooKeeper提供的名稱(chēng)空間更像是一個(gè)標(biāo)準(zhǔn)的文件系統(tǒng)。一個(gè)名字是一個(gè)由一個(gè)(或)分隔的路徑元素的序列。zookeeper名稱(chēng)空間的每個(gè)節(jié)點(diǎn)由路徑來(lái)標(biāo)示。
節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)
不像標(biāo)準(zhǔn)的文件系統(tǒng),在ZooKeeper 命名空間中每個(gè)節(jié)點(diǎn)都有與它相關(guān)的數(shù)據(jù)以及子節(jié)點(diǎn)。它就像這樣一個(gè)文件系統(tǒng),它允許一個(gè)文件也可以是一個(gè)目錄。(zookeeper是用來(lái)儲(chǔ)存協(xié)作數(shù)據(jù):狀態(tài)信息,配置,位置信息等,因此,存儲(chǔ)在每個(gè)節(jié)點(diǎn)的數(shù)據(jù)通常是很小的,在字節(jié)到千字節(jié)范圍。)我們使用術(shù)語(yǔ)znode來(lái)表明我們談?wù)摰氖莦ookeeper數(shù)據(jù)節(jié)點(diǎn)。
znodes保存一個(gè)數(shù)據(jù)結(jié)構(gòu),該數(shù)據(jù)結(jié)構(gòu)包括數(shù)據(jù)變化的版本號(hào)和時(shí)間戳,ACL的變化,這些信息允許緩存驗(yàn)證和協(xié)作更新。一個(gè)znode的數(shù)據(jù)的每次變化,版本號(hào)的增加。例如,每當(dāng)客戶(hù)檢索數(shù)據(jù)時(shí),它也接收到數(shù)據(jù)的版本。
在一個(gè)命名空間中的每個(gè)節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)的讀寫(xiě)都是原子性的。讀獲取一個(gè)Znode所有的數(shù)據(jù)字節(jié);寫(xiě)替換所有的數(shù)據(jù)。每個(gè)節(jié)點(diǎn)都有一個(gè)訪(fǎng)問(wèn)控制列表(ACL),限制誰(shuí)可以做什么。
zookeeper也有臨時(shí)節(jié)點(diǎn)的概念。這些znodes只要?jiǎng)?chuàng)建znode的會(huì)話(huà)是活躍的,它就存在的。當(dāng)會(huì)話(huà)結(jié)束時(shí),這些znode被刪除。
條件更新與監(jiān)控
ZooKeeper支持監(jiān)控的概念。客戶(hù)端可以在一個(gè)znode上設(shè)置一個(gè)監(jiān)控。當(dāng)znode發(fā)生變化時(shí)會(huì)觸發(fā)或者移除監(jiān)控。當(dāng)監(jiān)控觸發(fā)時(shí),客戶(hù)端接收到一個(gè)報(bào)文,表明znode發(fā)生了變化。若客戶(hù)端和一個(gè)zookeeper服務(wù)器的連接損壞時(shí),客戶(hù)端接收到一個(gè)本地通知。
保障
ZooKeeper非常快速和簡(jiǎn)單. 雖然它的目標(biāo)是為建設(shè)更為復(fù)雜的服務(wù),例如同步,它提供了一系列的保證。這些是:
- 順序一致性----客戶(hù)端的更新將被應(yīng)用于它們被發(fā)送的命令中。
- 原子性-- - 更新要么成功要么失敗,不存在部分成功或者部分失敗.
- 單系統(tǒng)映像 ---- 不管連接到哪臺(tái)服務(wù)器,客戶(hù)端看到相同的服務(wù)視圖.
- 可靠性---- 一旦一個(gè)更新發(fā)生,直到下次一個(gè)客戶(hù)端重新了更新,否則從更新的時(shí)間后都會(huì)保持。
- 及時(shí)性--- - 在一定時(shí)間范圍內(nèi)保證系統(tǒng)的客戶(hù)視圖是最新的.
簡(jiǎn)單api
zookeeper設(shè)計(jì)目標(biāo)之一是提供一個(gè)簡(jiǎn)單的編程接口,因此,它只支持下面這些操作:
create
在節(jié)點(diǎn)樹(shù)上某個(gè)位置上創(chuàng)建一個(gè)新的節(jié)點(diǎn)。
delete
刪除一個(gè)節(jié)點(diǎn)
exists
測(cè)試某位置的節(jié)點(diǎn)是否存在
get data
從一個(gè)節(jié)點(diǎn)讀取數(shù)據(jù)
set data
向一個(gè)節(jié)點(diǎn)寫(xiě)入數(shù)據(jù)
get children
檢索一個(gè)節(jié)點(diǎn)的一組子節(jié)點(diǎn)
sync
等待數(shù)據(jù)傳播至一致。
實(shí)現(xiàn)
zookeeper組件顯示了zookeeper服務(wù)的高級(jí)組件。除了request processor,組成zookeeper服務(wù)的每個(gè)服務(wù)器復(fù)制它的每個(gè)組件的copy。
zookeeper組件
replicated database是一個(gè)包含整個(gè)數(shù)據(jù)數(shù)的內(nèi)存數(shù)據(jù)庫(kù). 為了可復(fù)原,更新被寫(xiě)到磁盤(pán)上,寫(xiě)操作在應(yīng)用到內(nèi)存數(shù)據(jù)庫(kù)之前,先序列化到磁盤(pán)。
每個(gè)zookeeper服務(wù)器給所有的客戶(hù)端提供服務(wù)。客戶(hù)端恰恰連接到一個(gè)服務(wù)器來(lái)提交請(qǐng)求。讀請(qǐng)求由每個(gè)服務(wù)器數(shù)據(jù)庫(kù)的本地復(fù)制提供服務(wù)。寫(xiě)請(qǐng)求改變了服務(wù)的狀態(tài),由request processor來(lái)處理。
作為通信協(xié)議的一部分,所有客戶(hù)端的寫(xiě)請(qǐng)求由一個(gè)單獨(dú)的服務(wù)器處理,這個(gè)服務(wù)器是zookeeper的leader服務(wù)器,其余的zookeeper服務(wù)器叫做follower,follower從leader接收消息并達(dá)成消息傳輸。消息層在失敗后替換leader并同步到連接到leader所有的follower。
ZooKeeper使用自定義的原子消息協(xié)議. 因消息層是原子性的, ZooKeeper 可以保證本地復(fù)制不會(huì)沖突. 當(dāng)leader接收到一個(gè)寫(xiě)請(qǐng)求,當(dāng)寫(xiě)操作應(yīng)用到系統(tǒng)時(shí),leader計(jì)算出系統(tǒng)的狀態(tài),并轉(zhuǎn)化成一個(gè)捕捉新?tīng)顟B(tài)的事務(wù).
zookeeper啟動(dòng)
服務(wù)端啟動(dòng)
bin/zkServer.sh start
其中,啟動(dòng)命令如下:
start) echo -n "Starting zookeeper ... " if [ -f "$ZOOPIDFILE" ]; then if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then echo $command already running as process `cat "$ZOOPIDFILE"`. exit 0 fi fi nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null & if [ $? -eq 0 ] then if /bin/echo -n $! > "$ZOOPIDFILE" then sleep 1 pid=$(cat "${ZOOPIDFILE}") if ps -p "${pid}" > /dev/null 2>&1; then echo STARTED else echo FAILED TO START exit 1 fi else echo FAILED TO WRITE PID exit 1 fi else echo SERVER DID NOT START exit 1 fi ;;
其中:
ZOOMAIN 是啟動(dòng)程序的入口,其類(lèi)為:
org.Apache.zookeeper.server.quorum.QuorumPeerMain
它的啟動(dòng)方法為:
/** * To start the replicated server specify the configuration file name on * the command line. * @param args path to the configfile */ public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(2); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(3); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(4); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(1); } LOG.info("Exiting normally"); System.exit(0); }
調(diào)用初始化方法及run方法:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
上述代碼主要分3部分:
1. 解析配置文件,默認(rèn)的配置文件為上一級(jí)目錄
config/zookeeper.properties或者config/zookeeper.cfg
/** * Parse a ZooKeeper configuration file * @param path the patch of the configuration file * @throws ConfigException error processing configuration */ public void parse(String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); } parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } if (dynamicConfigFileStr!=null) { try { Properties dynamicCfg = new Properties(); FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr); try { dynamicCfg.load(inConfig); if (dynamicCfg.getProperty("version") != null) { throw new ConfigException("dynamic file shouldn't have version inside"); } String version = getVersionFromFilename(dynamicConfigFileStr); // If there isn't any version associated with the filename, // the default version is 0. if (version != null) { dynamicCfg.setProperty("version", version); } } finally { inConfig.close(); } setupQuorumPeerConfig(dynamicCfg, false); } catch (IOException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix); if (nextDynamicConfigFile.exists()) { try { Properties dynamicConfigNextCfg = new Properties(); FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile); try { dynamicConfigNextCfg.load(inConfigNext); } finally { inConfigNext.close(); } boolean isHierarchical = false; for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) { String key = entry.getKey().toString().trim(); if (key.startsWith("group") || key.startsWith("weight")) { isHierarchical = true; break; } } lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical); } catch (IOException e) { LOG.warn("NextQuorumVerifier is initiated to null"); } } } }
2. 啟動(dòng)安排清除任務(wù)
// Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
調(diào)用start方法:
/** * Validates the purge configuration and schedules the purge task. Purge * task keeps the most recent <code>snapRetainCount</code> number of * snapshots and deletes the remaining for every <code>purgeInterval</code> * hour(s). * <p> * <code>purgeInterval</code> of <code>0</code> or * <code>negative integer</code> will not schedule the purge task. * </p> * * @see PurgeTxnLog#purge(File, File, int) */ public void start() { if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } // Don't schedule the purge task with zero or negative purge interval. if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }
從上面代碼可以看到,清除工作啟動(dòng)了一個(gè)定時(shí)器timer,PurgeTask繼承實(shí)現(xiàn)了TimeTask(一個(gè)可以被定時(shí)器安排執(zhí)行一次或者多次的task),PurgeTask的實(shí)現(xiàn)如下:
static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask(File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run() { LOG.info("Purge task started."); try { PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occured while purging.", e); } LOG.info("Purge task completed."); } }
調(diào)用purge方法:
/** * Purges the snapshot and logs keeping the last num snapshots and the * corresponding logs. If logs are rolling or a new snapshot is created * during this process, these newest N snapshots or any data logs will be * excluded from current purging cycle. * * @param dataDir the dir that has the logs * @param snapDir the dir that has the snapshots * @param num the number of snapshots to keep * @throws IOException */ public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); retainNRecentSnapshots(txnLog, snaps); }
先獲取日志文件和快照,然后調(diào)用retainNRecentSnapshots方法處理:
static void retainNRecentSnapshots(FileTxnSnapLog txnLog, List<File> snaps) { // found any valid recent snapshots? if (snaps.size() == 0) return; File snapShot = snaps.get(snaps.size() -1); final long leastZxidToBeRetain = Util.getZxidFromName( snapShot.getName(), PREFIX_SNAPSHOT); class MyFileFilter implements FileFilter{ private final String prefix; MyFileFilter(String prefix){ this.prefix=prefix; } public boolean accept(File f){ if(!f.getName().startsWith(prefix + ".")) return false; long fZxid = Util.getZxidFromName(f.getName(), prefix); if (fZxid >= leastZxidToBeRetain) { return false; } return true; } } // add all non-excluded log files List<File> files = new ArrayList<File>(Arrays.asList(txnLog .getDataDir().listFiles(new MyFileFilter(PREFIX_LOG)))); // add all non-excluded snapshot files to the deletion list files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles( new MyFileFilter(PREFIX_SNAPSHOT)))); // remove the old files for(File f: files) { System.out.println("Removing file: "+ DateFormat.getDateTimeInstance().format(f.lastModified())+ "t"+f.getPath()); if(!f.delete()){ System.err.println("Failed to remove "+f.getPath()); } } }
3. 啟動(dòng)zookeeper 服務(wù)器
3.1 啟動(dòng)單機(jī)
/* * Start up the ZooKeeper server. * * @param args the configfile or the port datadir [ticktime] */ public static void main(String[] args) { ZooKeeperServerMain main = new ZooKeeperServerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(2); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(3); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(4); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(1); } LOG.info("Exiting normally"); System.exit(0); }
調(diào)用方法:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } runFromConfig(config); }
啟動(dòng)過(guò)程:
/** * Run from a ServerConfig. * @param config ServerConfig to use. * @throws IOException * @throws AdminServerException */ public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); ZooKeeperServer zkServer = new ZooKeeperServer( txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); // Start Admin server adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000) ); containerManager.start(); if (cnxnFactory != null) { cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.isRunning()) { zkServer.shutdown(); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } } }
cnxnFactory.startup(zkServer);[NettyServerCnxnFactory]
@Override public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException { start(); setZooKeeperServer(zks); if (startServer) { zks.startdata(); zks.startup(); } }
public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); state = State.RUNNING; notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
3.2 集群?jiǎn)?dòng)
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } quorumPeer = new QuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
從上述代碼可以看出,QuorumPeer的start()方法和join()方法是主流程。
QuorumPeer繼承了ZooKeeperThread,ZooKeeperThread繼承自Thread,故QuorumPeer間接繼承了Thread。
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); super.start(); }
3.2.1. 啟動(dòng)時(shí)先從內(nèi)存數(shù)據(jù)庫(kù)中恢復(fù)數(shù)據(jù)
private void loadDataBase() { try { zkDb.loadDataBase(); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only hAppen once when moving to a // new code version currentEpoch = epochOfZxid; LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch); writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); } if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } try { acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version acceptedEpoch = epochOfZxid; LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch); writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); } if (acceptedEpoch < currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch)); } } catch(IOException ie) { LOG.error("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } }
調(diào)用
/** * load the database from the disk onto memory and also add * the transactions to the committedlog in memory. * @return the last valid zxid on disk * @throws IOException */ public long loadDataBase() throws IOException { PlayBackListener listener=new PlayBackListener(){ public void onTxnLoaded(TxnHeader hdr,Record txn){ Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid()); addCommittedProposal(r); } }; long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); initialized = true; return zxid; } /** * maintains a list of last <i>committedLog</i> * or so committed requests. This is used for * fast follower synchronization. * @param request committed request */ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.removeFirst(); minCommittedLog = committedLog.getFirst().packet.getZxid(); } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.getHdr().serialize(boa, "hdr"); if (request.getTxn() != null) { request.getTxn().serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.error("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; committedLog.add(p); maxCommittedLog = p.packet.getZxid(); } finally { wl.unlock(); } }
3.2.2 啟動(dòng)NettyServerCnxnFactory綁定服務(wù)
@Override public void start() { LOG.info("binding to port " + localAddress); parentChannel = bootstrap.bind(localAddress); }
3.2.3 選舉算法
synchronized public void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } // if (!getView().containsKey(myid)) { // throw new RuntimeException("My id " + myid + " not in the peer list"); //} if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } this.electionAlg = createElectionAlgorithm(electionType); }
調(diào)用
@SuppressWarnings("deprecation") protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = new QuorumCnxManager(this); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
調(diào)用選舉方法:
/** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. */ public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = Time.currentElapsedTime(); } try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view. */ switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify that * a majority are following the same leader. * Only peer epoch is used to check that the votes come * from the same ensemble. This is because there is at * least one corner case in which the ensemble can be * created with inconsistent zxid and election epoch * info. However, given that only one ensemble can be * running at a single point in time and that each * epoch is used only once, using only the epoch to * compare the votes is sufficient. * * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 */ outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)); if (termPredicate(outofelection, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, IGNOREVALUE)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)"); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; } }
4. 小結(jié)
本文先介紹了zookeeper開(kāi)源分布式協(xié)作系統(tǒng)及其特點(diǎn)、應(yīng)用場(chǎng)景,然后根據(jù)zookeeper的啟動(dòng)方式,找到zookeeper的入口。在入口方法中,單機(jī)啟動(dòng)使用ZooKeeperServerMain,最終調(diào)用ZookeeperServer的startup()方法來(lái)RequestProcessor;集群?jiǎn)?dòng)時(shí)調(diào)用QuorumPeer的start方法,接著也是調(diào)用ZookeeperServer的startup()方法來(lái)RequestProcessor,最后調(diào)用選舉算法選出leader。
參考文獻(xiàn):
【1】http://zookeeper.apache.org/doc/r3.4.6/zookeeperOver.html
【2】http://zookeeper.apache.org/doc/r3.4.6/zookeeperStarted.html