日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

zookeeper簡(jiǎn)介

zookeeper是為分布式應(yīng)用提供分布式協(xié)作服務(wù)的開(kāi)源軟件。它提供了一組簡(jiǎn)單的原子操作,分布式應(yīng)用可以基于這些原子操作來(lái)實(shí)現(xiàn)更高層次的同步服務(wù),配置維護(hù),組管理和命名。zookeeper的設(shè)計(jì)使基于它的編程非常容易,若我們熟悉目錄樹(shù)結(jié)構(gòu)的文件系統(tǒng),也會(huì)很容易使用zookeeper的數(shù)據(jù)模型樣式。它運(yùn)行在JAVA上,有java和c的客戶(hù)端。

協(xié)作服務(wù)因難于獲取正確而臭名遠(yuǎn)揚(yáng),他們特別易于出錯(cuò)如競(jìng)爭(zhēng)條件和死鎖。zookeeper的動(dòng)機(jī)是減輕分布式應(yīng)用中從零開(kāi)始實(shí)現(xiàn)協(xié)作服務(wù)的壓力。

zookeeper的特點(diǎn)

1.簡(jiǎn)單:zookeeper運(yùn)行分布式進(jìn)行通過(guò)一個(gè)共享的層次命名空間來(lái)進(jìn)行協(xié)作,該命名空間的組織類(lèi)似于標(biāo)準(zhǔn)的文件系統(tǒng)。命名空間包括數(shù)據(jù)注冊(cè)器(稱(chēng)之為znode),在zookeeper看來(lái),這類(lèi)似于文件和目錄。與典型的文件系統(tǒng)設(shè)計(jì)用來(lái)存儲(chǔ)不同的是,zookeeper數(shù)據(jù)是存放在內(nèi)存中,這意味著zookeeper可以實(shí)現(xiàn)很高的吞吐量和低延遲。

ZooKeeper 實(shí)現(xiàn)在高性能,高可用性,嚴(yán)格有序的訪(fǎng)問(wèn)方面有很大的優(yōu)勢(shì)。在性能方面的優(yōu)勢(shì)使它可以應(yīng)用在大型的的分布式系統(tǒng)。在可靠性方面,避免單點(diǎn)故障。嚴(yán)格的順序訪(fǎng)問(wèn)使它在客戶(hù)端可以實(shí)現(xiàn)復(fù)雜的同步原語(yǔ)。

2. 可復(fù)制:類(lèi)似于分布式進(jìn)程的協(xié)作,zookeeper本身很容易在一組主機(jī)(稱(chēng)之為集合)中實(shí)現(xiàn)復(fù)制。zookeeper服務(wù)示意圖:

zookeeper源碼分析之一服務(wù)端啟動(dòng)過(guò)程

 

組成ZooKeeper服務(wù)的一組服務(wù)器都必須知道對(duì)方的。它們保存了內(nèi)存映像的狀態(tài),以及在持久存儲(chǔ)中的事務(wù)日志和快照。只要大部分的服務(wù)器可用,ZooKeeper服務(wù)將可用。

客戶(hù)端連接到一臺(tái)ZooKeeper服務(wù)器。客戶(hù)端維護(hù)一個(gè)TCP連接,通過(guò)它發(fā)送請(qǐng)求,得到響應(yīng),得到監(jiān)視事件,并發(fā)送心跳。如果TCP連接到服務(wù)器中斷,客戶(hù)端可以連接到不同的服務(wù)器。

3. 有序:ZooKeeper給每次更新使用數(shù)字打標(biāo)記,它反映了所有zookeeper事務(wù)的順序。隨后的操作可以使用這些順序來(lái)實(shí)現(xiàn)更高級(jí)別的抽象,如同步原語(yǔ)。

4.快速:它特別快,在“讀為主”的工作中,ZooKeeper 應(yīng)用程序運(yùn)行在數(shù)千臺(tái)機(jī)器,它在讀遠(yuǎn)比寫(xiě)更多的時(shí)候(在10:1的比例)表現(xiàn)的最好。

數(shù)據(jù)模型與層次命名空間

ZooKeeper提供的名稱(chēng)空間更像是一個(gè)標(biāo)準(zhǔn)的文件系統(tǒng)。一個(gè)名字是一個(gè)由一個(gè)(或)分隔的路徑元素的序列。zookeeper名稱(chēng)空間的每個(gè)節(jié)點(diǎn)由路徑來(lái)標(biāo)示。

zookeeper源碼分析之一服務(wù)端啟動(dòng)過(guò)程

 

節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)

不像標(biāo)準(zhǔn)的文件系統(tǒng),在ZooKeeper 命名空間中每個(gè)節(jié)點(diǎn)都有與它相關(guān)的數(shù)據(jù)以及子節(jié)點(diǎn)。它就像這樣一個(gè)文件系統(tǒng),它允許一個(gè)文件也可以是一個(gè)目錄。(zookeeper是用來(lái)儲(chǔ)存協(xié)作數(shù)據(jù):狀態(tài)信息,配置,位置信息等,因此,存儲(chǔ)在每個(gè)節(jié)點(diǎn)的數(shù)據(jù)通常是很小的,在字節(jié)到千字節(jié)范圍。)我們使用術(shù)語(yǔ)znode來(lái)表明我們談?wù)摰氖莦ookeeper數(shù)據(jù)節(jié)點(diǎn)。

znodes保存一個(gè)數(shù)據(jù)結(jié)構(gòu),該數(shù)據(jù)結(jié)構(gòu)包括數(shù)據(jù)變化的版本號(hào)和時(shí)間戳,ACL的變化,這些信息允許緩存驗(yàn)證和協(xié)作更新。一個(gè)znode的數(shù)據(jù)的每次變化,版本號(hào)的增加。例如,每當(dāng)客戶(hù)檢索數(shù)據(jù)時(shí),它也接收到數(shù)據(jù)的版本。

在一個(gè)命名空間中的每個(gè)節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)的讀寫(xiě)都是原子性的。讀獲取一個(gè)Znode所有的數(shù)據(jù)字節(jié);寫(xiě)替換所有的數(shù)據(jù)。每個(gè)節(jié)點(diǎn)都有一個(gè)訪(fǎng)問(wèn)控制列表(ACL),限制誰(shuí)可以做什么。

zookeeper也有臨時(shí)節(jié)點(diǎn)的概念。這些znodes只要?jiǎng)?chuàng)建znode的會(huì)話(huà)是活躍的,它就存在的。當(dāng)會(huì)話(huà)結(jié)束時(shí),這些znode被刪除。

條件更新與監(jiān)控

ZooKeeper支持監(jiān)控的概念。客戶(hù)端可以在一個(gè)znode上設(shè)置一個(gè)監(jiān)控。當(dāng)znode發(fā)生變化時(shí)會(huì)觸發(fā)或者移除監(jiān)控。當(dāng)監(jiān)控觸發(fā)時(shí),客戶(hù)端接收到一個(gè)報(bào)文,表明znode發(fā)生了變化。若客戶(hù)端和一個(gè)zookeeper服務(wù)器的連接損壞時(shí),客戶(hù)端接收到一個(gè)本地通知。

保障

ZooKeeper非常快速和簡(jiǎn)單. 雖然它的目標(biāo)是為建設(shè)更為復(fù)雜的服務(wù),例如同步,它提供了一系列的保證。這些是:

  • 順序一致性----客戶(hù)端的更新將被應(yīng)用于它們被發(fā)送的命令中。
  • 原子性-- - 更新要么成功要么失敗,不存在部分成功或者部分失敗.
  • 單系統(tǒng)映像 ---- 不管連接到哪臺(tái)服務(wù)器,客戶(hù)端看到相同的服務(wù)視圖.
  • 可靠性---- 一旦一個(gè)更新發(fā)生,直到下次一個(gè)客戶(hù)端重新了更新,否則從更新的時(shí)間后都會(huì)保持。
  • 及時(shí)性--- - 在一定時(shí)間范圍內(nèi)保證系統(tǒng)的客戶(hù)視圖是最新的.

簡(jiǎn)單api

zookeeper設(shè)計(jì)目標(biāo)之一是提供一個(gè)簡(jiǎn)單的編程接口,因此,它只支持下面這些操作:

create

在節(jié)點(diǎn)樹(shù)上某個(gè)位置上創(chuàng)建一個(gè)新的節(jié)點(diǎn)。

delete

刪除一個(gè)節(jié)點(diǎn)

exists

測(cè)試某位置的節(jié)點(diǎn)是否存在

get data

從一個(gè)節(jié)點(diǎn)讀取數(shù)據(jù)

set data

向一個(gè)節(jié)點(diǎn)寫(xiě)入數(shù)據(jù)

get children

檢索一個(gè)節(jié)點(diǎn)的一組子節(jié)點(diǎn)

sync

等待數(shù)據(jù)傳播至一致。

實(shí)現(xiàn)

zookeeper組件顯示了zookeeper服務(wù)的高級(jí)組件。除了request processor,組成zookeeper服務(wù)的每個(gè)服務(wù)器復(fù)制它的每個(gè)組件的copy。

zookeeper組件

zookeeper源碼分析之一服務(wù)端啟動(dòng)過(guò)程

 

replicated database是一個(gè)包含整個(gè)數(shù)據(jù)數(shù)的內(nèi)存數(shù)據(jù)庫(kù). 為了可復(fù)原,更新被寫(xiě)到磁盤(pán)上,寫(xiě)操作在應(yīng)用到內(nèi)存數(shù)據(jù)庫(kù)之前,先序列化到磁盤(pán)。

每個(gè)zookeeper服務(wù)器給所有的客戶(hù)端提供服務(wù)。客戶(hù)端恰恰連接到一個(gè)服務(wù)器來(lái)提交請(qǐng)求。讀請(qǐng)求由每個(gè)服務(wù)器數(shù)據(jù)庫(kù)的本地復(fù)制提供服務(wù)。寫(xiě)請(qǐng)求改變了服務(wù)的狀態(tài),由request processor來(lái)處理。

作為通信協(xié)議的一部分,所有客戶(hù)端的寫(xiě)請(qǐng)求由一個(gè)單獨(dú)的服務(wù)器處理,這個(gè)服務(wù)器是zookeeper的leader服務(wù)器,其余的zookeeper服務(wù)器叫做follower,follower從leader接收消息并達(dá)成消息傳輸。消息層在失敗后替換leader并同步到連接到leader所有的follower。

ZooKeeper使用自定義的原子消息協(xié)議. 因消息層是原子性的, ZooKeeper 可以保證本地復(fù)制不會(huì)沖突. 當(dāng)leader接收到一個(gè)寫(xiě)請(qǐng)求,當(dāng)寫(xiě)操作應(yīng)用到系統(tǒng)時(shí),leader計(jì)算出系統(tǒng)的狀態(tài),并轉(zhuǎn)化成一個(gè)捕捉新?tīng)顟B(tài)的事務(wù).

zookeeper啟動(dòng)

服務(wù)端啟動(dòng)

bin/zkServer.sh start

其中,啟動(dòng)命令如下:

start)
 echo -n "Starting zookeeper ... "
 if [ -f "$ZOOPIDFILE" ]; then
 if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
 echo $command already running as process `cat "$ZOOPIDFILE"`.
 exit 0
 fi
 fi
 nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" 
 "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
 -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' 
 -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
 if [ $? -eq 0 ]
 then
 if /bin/echo -n $! > "$ZOOPIDFILE"
 then
 sleep 1
 pid=$(cat "${ZOOPIDFILE}")
 if ps -p "${pid}" > /dev/null 2>&1; then
 echo STARTED
 else
 echo FAILED TO START
 exit 1
 fi
 else
 echo FAILED TO WRITE PID
 exit 1
 fi
 else
 echo SERVER DID NOT START
 exit 1
 fi
 ;;

其中:

ZOOMAIN 是啟動(dòng)程序的入口,其類(lèi)為:

org.Apache.zookeeper.server.quorum.QuorumPeerMain

它的啟動(dòng)方法為:

 /**
 * To start the replicated server specify the configuration file name on
 * the command line.
 * @param args path to the configfile
 */
 public static void main(String[] args) {
 QuorumPeerMain main = new QuorumPeerMain();
 try {
 main.initializeAndRun(args);
 } catch (IllegalArgumentException e) {
 LOG.error("Invalid arguments, exiting abnormally", e);
 LOG.info(USAGE);
 System.err.println(USAGE);
 System.exit(2);
 } catch (ConfigException e) {
 LOG.error("Invalid config, exiting abnormally", e);
 System.err.println("Invalid config, exiting abnormally");
 System.exit(2);
 } catch (DatadirException e) {
 LOG.error("Unable to access datadir, exiting abnormally", e);
 System.err.println("Unable to access datadir, exiting abnormally");
 System.exit(3);
 } catch (AdminServerException e) {
 LOG.error("Unable to start AdminServer, exiting abnormally", e);
 System.err.println("Unable to start AdminServer, exiting abnormally");
 System.exit(4);
 } catch (Exception e) {
 LOG.error("Unexpected exception, exiting abnormally", e);
 System.exit(1);
 }
 LOG.info("Exiting normally");
 System.exit(0);
 }

調(diào)用初始化方法及run方法:

 protected void initializeAndRun(String[] args)
 throws ConfigException, IOException, AdminServerException
 {
 QuorumPeerConfig config = new QuorumPeerConfig();
 if (args.length == 1) {
 config.parse(args[0]);
 }
 // Start and schedule the the purge task
 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
 .getDataDir(), config.getDataLogDir(), config
 .getSnapRetainCount(), config.getPurgeInterval());
 purgeMgr.start();
 if (args.length == 1 && config.isDistributed()) {
 runFromConfig(config);
 } else {
 LOG.warn("Either no config or no quorum defined in config, running "
 + " in standalone mode");
 // there is only server in the quorum -- run as standalone
 ZooKeeperServerMain.main(args);
 }
 }

上述代碼主要分3部分:

1. 解析配置文件,默認(rèn)的配置文件為上一級(jí)目錄

config/zookeeper.properties或者config/zookeeper.cfg
 /**
 * Parse a ZooKeeper configuration file
 * @param path the patch of the configuration file
 * @throws ConfigException error processing configuration
 */
 public void parse(String path) throws ConfigException {
 LOG.info("Reading configuration from: " + path);
 
 try {
 File configFile = (new VerifyingFileFactory.Builder(LOG)
 .warnForRelativePath()
 .failForNonExistingPath()
 .build()).create(path);
 
 Properties cfg = new Properties();
 FileInputStream in = new FileInputStream(configFile);
 try {
 cfg.load(in);
 configFileStr = path;
 } finally {
 in.close();
 }
 
 parseProperties(cfg);
 } catch (IOException e) {
 throw new ConfigException("Error processing " + path, e);
 } catch (IllegalArgumentException e) {
 throw new ConfigException("Error processing " + path, e);
 } 
 
 if (dynamicConfigFileStr!=null) {
 try { 
 Properties dynamicCfg = new Properties();
 FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);
 try {
 dynamicCfg.load(inConfig);
 if (dynamicCfg.getProperty("version") != null) {
 throw new ConfigException("dynamic file shouldn't have version inside");
 }
 String version = getVersionFromFilename(dynamicConfigFileStr);
 // If there isn't any version associated with the filename,
 // the default version is 0.
 if (version != null) {
 dynamicCfg.setProperty("version", version);
 }
 } finally {
 inConfig.close();
 }
 setupQuorumPeerConfig(dynamicCfg, false);
 } catch (IOException e) {
 throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
 } catch (IllegalArgumentException e) {
 throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
 } 
 File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix);
 if (nextDynamicConfigFile.exists()) {
 try { 
 Properties dynamicConfigNextCfg = new Properties();
 FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile); 
 try {
 dynamicConfigNextCfg.load(inConfigNext);
 } finally {
 inConfigNext.close();
 }
 boolean isHierarchical = false;
 for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) {
 String key = entry.getKey().toString().trim(); 
 if (key.startsWith("group") || key.startsWith("weight")) {
 isHierarchical = true;
 break;
 }
 }
 lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);
 } catch (IOException e) {
 LOG.warn("NextQuorumVerifier is initiated to null");
 }
 }
 }
 }

2. 啟動(dòng)安排清除任務(wù)

 // Start and schedule the the purge task
 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
 .getDataDir(), config.getDataLogDir(), config
 .getSnapRetainCount(), config.getPurgeInterval());
 purgeMgr.start();

調(diào)用start方法:

/**
 * Validates the purge configuration and schedules the purge task. Purge
 * task keeps the most recent <code>snapRetainCount</code> number of
 * snapshots and deletes the remaining for every <code>purgeInterval</code>
 * hour(s).
 * <p>
 * <code>purgeInterval</code> of <code>0</code> or
 * <code>negative integer</code> will not schedule the purge task.
 * </p>
 * 
 * @see PurgeTxnLog#purge(File, File, int)
 */
 public void start() {
 if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
 LOG.warn("Purge task is already running.");
 return;
 }
 // Don't schedule the purge task with zero or negative purge interval.
 if (purgeInterval <= 0) {
 LOG.info("Purge task is not scheduled.");
 return;
 }
 timer = new Timer("PurgeTask", true);
 TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
 timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
 purgeTaskStatus = PurgeTaskStatus.STARTED;
 }

從上面代碼可以看到,清除工作啟動(dòng)了一個(gè)定時(shí)器timer,PurgeTask繼承實(shí)現(xiàn)了TimeTask(一個(gè)可以被定時(shí)器安排執(zhí)行一次或者多次的task),PurgeTask的實(shí)現(xiàn)如下:

 static class PurgeTask extends TimerTask {
 private File logsDir;
 private File snapsDir;
 private int snapRetainCount;
 public PurgeTask(File dataDir, File snapDir, int count) {
 logsDir = dataDir;
 snapsDir = snapDir;
 snapRetainCount = count;
 }
 @Override
 public void run() {
 LOG.info("Purge task started.");
 try {
 PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
 } catch (Exception e) {
 LOG.error("Error occured while purging.", e);
 }
 LOG.info("Purge task completed.");
 }
 }

調(diào)用purge方法:

/**
 * Purges the snapshot and logs keeping the last num snapshots and the
 * corresponding logs. If logs are rolling or a new snapshot is created
 * during this process, these newest N snapshots or any data logs will be
 * excluded from current purging cycle.
 *
 * @param dataDir the dir that has the logs
 * @param snapDir the dir that has the snapshots
 * @param num the number of snapshots to keep
 * @throws IOException
 */
 public static void purge(File dataDir, File snapDir, int num) throws IOException {
 if (num < 3) {
 throw new IllegalArgumentException(COUNT_ERR_MSG);
 }
 FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
 List<File> snaps = txnLog.findNRecentSnapshots(num);
 retainNRecentSnapshots(txnLog, snaps);
 }

先獲取日志文件和快照,然后調(diào)用retainNRecentSnapshots方法處理:

 static void retainNRecentSnapshots(FileTxnSnapLog txnLog, List<File> snaps) {
 // found any valid recent snapshots?
 if (snaps.size() == 0)
 return;
 File snapShot = snaps.get(snaps.size() -1);
 final long leastZxidToBeRetain = Util.getZxidFromName(
 snapShot.getName(), PREFIX_SNAPSHOT);
 class MyFileFilter implements FileFilter{
 private final String prefix;
 MyFileFilter(String prefix){
 this.prefix=prefix;
 }
 public boolean accept(File f){
 if(!f.getName().startsWith(prefix + "."))
 return false;
 long fZxid = Util.getZxidFromName(f.getName(), prefix);
 if (fZxid >= leastZxidToBeRetain) {
 return false;
 }
 return true;
 }
 }
 // add all non-excluded log files
 List<File> files = new ArrayList<File>(Arrays.asList(txnLog
 .getDataDir().listFiles(new MyFileFilter(PREFIX_LOG))));
 // add all non-excluded snapshot files to the deletion list
 files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles(
 new MyFileFilter(PREFIX_SNAPSHOT))));
 // remove the old files
 for(File f: files)
 {
 System.out.println("Removing file: "+
 DateFormat.getDateTimeInstance().format(f.lastModified())+
 "t"+f.getPath());
 if(!f.delete()){
 System.err.println("Failed to remove "+f.getPath());
 }
 }
 }

3. 啟動(dòng)zookeeper 服務(wù)器

3.1 啟動(dòng)單機(jī)

 /*
 * Start up the ZooKeeper server.
 *
 * @param args the configfile or the port datadir [ticktime]
 */
 public static void main(String[] args) {
 ZooKeeperServerMain main = new ZooKeeperServerMain();
 try {
 main.initializeAndRun(args);
 } catch (IllegalArgumentException e) {
 LOG.error("Invalid arguments, exiting abnormally", e);
 LOG.info(USAGE);
 System.err.println(USAGE);
 System.exit(2);
 } catch (ConfigException e) {
 LOG.error("Invalid config, exiting abnormally", e);
 System.err.println("Invalid config, exiting abnormally");
 System.exit(2);
 } catch (DatadirException e) {
 LOG.error("Unable to access datadir, exiting abnormally", e);
 System.err.println("Unable to access datadir, exiting abnormally");
 System.exit(3);
 } catch (AdminServerException e) {
 LOG.error("Unable to start AdminServer, exiting abnormally", e);
 System.err.println("Unable to start AdminServer, exiting abnormally");
 System.exit(4);
 } catch (Exception e) {
 LOG.error("Unexpected exception, exiting abnormally", e);
 System.exit(1);
 }
 LOG.info("Exiting normally");
 System.exit(0);
 }

調(diào)用方法:

 protected void initializeAndRun(String[] args)
 throws ConfigException, IOException, AdminServerException
 {
 try {
 ManagedUtil.registerLog4jMBeans();
 } catch (JMException e) {
 LOG.warn("Unable to register log4j JMX control", e);
 }
 ServerConfig config = new ServerConfig();
 if (args.length == 1) {
 config.parse(args[0]);
 } else {
 config.parse(args);
 }
 runFromConfig(config);
 }

啟動(dòng)過(guò)程:

 /**
 * Run from a ServerConfig.
 * @param config ServerConfig to use.
 * @throws IOException
 * @throws AdminServerException
 */
 public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
 LOG.info("Starting server");
 FileTxnSnapLog txnLog = null;
 try {
 // Note that this thread isn't going to be doing anything else,
 // so rather than spawning another thread, we will just call
 // run() in this thread.
 // create a file logger url from the command line args
 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
 ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
 config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
 // Start Admin server
 adminServer = AdminServerFactory.createAdminServer();
 adminServer.setZooKeeperServer(zkServer);
 adminServer.start();
 boolean needStartZKServer = true;
 if (config.getClientPortAddress() != null) {
 cnxnFactory = ServerCnxnFactory.createFactory();
 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
 cnxnFactory.startup(zkServer);
 // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
 needStartZKServer = false;
 }
 if (config.getSecureClientPortAddress() != null) {
 secureCnxnFactory = ServerCnxnFactory.createFactory();
 secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
 secureCnxnFactory.startup(zkServer, needStartZKServer);
 }
 containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
 Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
 Integer.getInteger("znode.container.maxPerMinute", 10000)
 );
 containerManager.start();
 if (cnxnFactory != null) {
 cnxnFactory.join();
 }
 if (secureCnxnFactory != null) {
 secureCnxnFactory.join();
 }
 if (zkServer.isRunning()) {
 zkServer.shutdown();
 }
 } catch (InterruptedException e) {
 // warn, but generally this is ok
 LOG.warn("Server interrupted", e);
 } finally {
 if (txnLog != null) {
 txnLog.close();
 }
 }
 }
cnxnFactory.startup(zkServer);[NettyServerCnxnFactory]
 @Override
 public void startup(ZooKeeperServer zks, boolean startServer)
 throws IOException, InterruptedException {
 start();
 setZooKeeperServer(zks);
 if (startServer) {
 zks.startdata();
 zks.startup();
 }
 }
 public synchronized void startup() {
 if (sessionTracker == null) {
 createSessionTracker();
 }
 startSessionTracker();
 setupRequestProcessors();
 registerJMX();
 state = State.RUNNING;
 notifyAll();
 }
 protected void setupRequestProcessors() {
 RequestProcessor finalProcessor = new FinalRequestProcessor(this);
 RequestProcessor syncProcessor = new SyncRequestProcessor(this,
 finalProcessor);
 ((SyncRequestProcessor)syncProcessor).start();
 firstProcessor = new PrepRequestProcessor(this, syncProcessor);
 ((PrepRequestProcessor)firstProcessor).start();
 }

3.2 集群?jiǎn)?dòng)

 public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
 try {
 ManagedUtil.registerLog4jMBeans();
 } catch (JMException e) {
 LOG.warn("Unable to register log4j JMX control", e);
 }
 LOG.info("Starting quorum peer");
 try {
 ServerCnxnFactory cnxnFactory = null;
 ServerCnxnFactory secureCnxnFactory = null;
 if (config.getClientPortAddress() != null) {
 cnxnFactory = ServerCnxnFactory.createFactory();
 cnxnFactory.configure(config.getClientPortAddress(),
 config.getMaxClientCnxns(),
 false);
 }
 if (config.getSecureClientPortAddress() != null) {
 secureCnxnFactory = ServerCnxnFactory.createFactory();
 secureCnxnFactory.configure(config.getSecureClientPortAddress(),
 config.getMaxClientCnxns(),
 true);
 }
 quorumPeer = new QuorumPeer();
 quorumPeer.setTxnFactory(new FileTxnSnapLog(
 config.getDataLogDir(),
 config.getDataDir()));
 quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
 quorumPeer.enableLocalSessionsUpgrading(
 config.isLocalSessionsUpgradingEnabled());
 //quorumPeer.setQuorumPeers(config.getAllMembers());
 quorumPeer.setElectionType(config.getElectionAlg());
 quorumPeer.setMyid(config.getServerId());
 quorumPeer.setTickTime(config.getTickTime());
 quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
 quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
 quorumPeer.setInitLimit(config.getInitLimit());
 quorumPeer.setSyncLimit(config.getSyncLimit());
 quorumPeer.setConfigFileName(config.getConfigFilename());
 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
 quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
 if (config.getLastSeenQuorumVerifier()!=null) {
 quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
 }
 quorumPeer.initConfigInZKDatabase();
 quorumPeer.setCnxnFactory(cnxnFactory);
 quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
 quorumPeer.setLearnerType(config.getPeerType());
 quorumPeer.setSyncEnabled(config.getSyncEnabled());
 quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
 
 quorumPeer.start();
 quorumPeer.join();
 } catch (InterruptedException e) {
 // warn, but generally this is ok
 LOG.warn("Quorum Peer interrupted", e);
 }
 }

從上述代碼可以看出,QuorumPeer的start()方法和join()方法是主流程。

QuorumPeer繼承了ZooKeeperThread,ZooKeeperThread繼承自Thread,故QuorumPeer間接繼承了Thread。

 @Override
 public synchronized void start() {
 if (!getView().containsKey(myid)) {
 throw new RuntimeException("My id " + myid + " not in the peer list");
 }
 loadDataBase();
 startServerCnxnFactory();
 try {
 adminServer.start();
 } catch (AdminServerException e) {
 LOG.warn("Problem starting AdminServer", e);
 System.out.println(e);
 }
 startLeaderElection();
 super.start();
 }

3.2.1. 啟動(dòng)時(shí)先從內(nèi)存數(shù)據(jù)庫(kù)中恢復(fù)數(shù)據(jù)

 private void loadDataBase() {
 try {
 zkDb.loadDataBase();
 // load the epochs
 long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
 long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
 try {
 currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
 } catch(FileNotFoundException e) {
 // pick a reasonable epoch number
 // this should only hAppen once when moving to a
 // new code version
 currentEpoch = epochOfZxid;
 LOG.info(CURRENT_EPOCH_FILENAME
 + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
 currentEpoch);
 writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
 }
 if (epochOfZxid > currentEpoch) {
 throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
 }
 try {
 acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
 } catch(FileNotFoundException e) {
 // pick a reasonable epoch number
 // this should only happen once when moving to a
 // new code version
 acceptedEpoch = epochOfZxid;
 LOG.info(ACCEPTED_EPOCH_FILENAME
 + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
 acceptedEpoch);
 writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
 }
 if (acceptedEpoch < currentEpoch) {
 throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
 }
 } catch(IOException ie) {
 LOG.error("Unable to load database on disk", ie);
 throw new RuntimeException("Unable to run quorum server ", ie);
 }
 }

調(diào)用

 /**
 * load the database from the disk onto memory and also add
 * the transactions to the committedlog in memory.
 * @return the last valid zxid on disk
 * @throws IOException
 */
 public long loadDataBase() throws IOException {
 PlayBackListener listener=new PlayBackListener(){
 public void onTxnLoaded(TxnHeader hdr,Record txn){
 Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid());
 addCommittedProposal(r);
 }
 };
 long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
 initialized = true;
 return zxid;
 }
 /**
 * maintains a list of last <i>committedLog</i>
 * or so committed requests. This is used for
 * fast follower synchronization.
 * @param request committed request
 */
 public void addCommittedProposal(Request request) {
 WriteLock wl = logLock.writeLock();
 try {
 wl.lock();
 if (committedLog.size() > commitLogCount) {
 committedLog.removeFirst();
 minCommittedLog = committedLog.getFirst().packet.getZxid();
 }
 if (committedLog.isEmpty()) {
 minCommittedLog = request.zxid;
 maxCommittedLog = request.zxid;
 }
 ByteArrayOutputStream baos = new ByteArrayOutputStream();
 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 try {
 request.getHdr().serialize(boa, "hdr");
 if (request.getTxn() != null) {
 request.getTxn().serialize(boa, "txn");
 }
 baos.close();
 } catch (IOException e) {
 LOG.error("This really should be impossible", e);
 }
 QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
 baos.toByteArray(), null);
 Proposal p = new Proposal();
 p.packet = pp;
 p.request = request;
 committedLog.add(p);
 maxCommittedLog = p.packet.getZxid();
 } finally {
 wl.unlock();
 }
 }

3.2.2 啟動(dòng)NettyServerCnxnFactory綁定服務(wù)

 @Override
 public void start() {
 LOG.info("binding to port " + localAddress);
 parentChannel = bootstrap.bind(localAddress);
 }

3.2.3 選舉算法

 synchronized public void startLeaderElection() {
 try {
 if (getPeerState() == ServerState.LOOKING) {
 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
 }
 } catch(IOException e) {
 RuntimeException re = new RuntimeException(e.getMessage());
 re.setStackTrace(e.getStackTrace());
 throw re;
 }
 // if (!getView().containsKey(myid)) {
 // throw new RuntimeException("My id " + myid + " not in the peer list");
 //}
 if (electionType == 0) {
 try {
 udpSocket = new DatagramSocket(myQuorumAddr.getPort());
 responder = new ResponderThread();
 responder.start();
 } catch (SocketException e) {
 throw new RuntimeException(e);
 }
 }
 this.electionAlg = createElectionAlgorithm(electionType);
 }

調(diào)用

 @SuppressWarnings("deprecation")
 protected Election createElectionAlgorithm(int electionAlgorithm){
 Election le=null;
 //TODO: use a factory rather than a switch
 switch (electionAlgorithm) {
 case 0:
 le = new LeaderElection(this);
 break;
 case 1:
 le = new AuthFastLeaderElection(this);
 break;
 case 2:
 le = new AuthFastLeaderElection(this, true);
 break;
 case 3:
 qcm = new QuorumCnxManager(this);
 QuorumCnxManager.Listener listener = qcm.listener;
 if(listener != null){
 listener.start();
 FastLeaderElection fle = new FastLeaderElection(this, qcm);
 fle.start();
 le = fle;
 } else {
 LOG.error("Null listener when initializing cnx manager");
 }
 break;
 default:
 assert false;
 }
 return le;
 }

調(diào)用選舉方法:

/**
 * Starts a new round of leader election. Whenever our QuorumPeer
 * changes its state to LOOKING, this method is invoked, and it
 * sends notifications to all other peers.
 */
 public Vote lookForLeader() throws InterruptedException {
 try {
 self.jmxLeaderElectionBean = new LeaderElectionBean();
 MBeanRegistry.getInstance().register(
 self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
 } catch (Exception e) {
 LOG.warn("Failed to register with JMX", e);
 self.jmxLeaderElectionBean = null;
 }
 if (self.start_fle == 0) {
 self.start_fle = Time.currentElapsedTime();
 }
 try {
 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
 HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
 int notTimeout = finalizeWait;
 synchronized(this){
 logicalclock.incrementAndGet();
 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
 }
 LOG.info("New election. My id = " + self.getId() +
 ", proposed zxid=0x" + Long.toHexString(proposedZxid));
 sendNotifications();
 /*
 * Loop in which we exchange notifications until we find a leader
 */
 while ((self.getPeerState() == ServerState.LOOKING) &&
 (!stop)){
 /*
 * Remove next notification from queue, times out after 2 times
 * the termination time
 */
 Notification n = recvqueue.poll(notTimeout,
 TimeUnit.MILLISECONDS);
 /*
 * Sends more notifications if haven't received enough.
 * Otherwise processes new notification.
 */
 if(n == null){
 if(manager.haveDelivered()){
 sendNotifications();
 } else {
 manager.connectAll();
 }
 /*
 * Exponential backoff
 */
 int tmpTimeOut = notTimeout*2;
 notTimeout = (tmpTimeOut < maxNotificationInterval?
 tmpTimeOut : maxNotificationInterval);
 LOG.info("Notification time out: " + notTimeout);
 } 
 else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
 /*
 * Only proceed if the vote comes from a replica in the current or next
 * voting view.
 */
 switch (n.state) {
 case LOOKING:
 // If notification > current, replace and send messages out
 if (n.electionEpoch > logicalclock.get()) {
 logicalclock.set(n.electionEpoch);
 recvset.clear();
 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
 getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
 updateProposal(n.leader, n.zxid, n.peerEpoch);
 } else {
 updateProposal(getInitId(),
 getInitLastLoggedZxid(),
 getPeerEpoch());
 }
 sendNotifications();
 } else if (n.electionEpoch < logicalclock.get()) {
 if(LOG.isDebugEnabled()){
 LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
 + Long.toHexString(n.electionEpoch)
 + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
 }
 break;
 } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
 proposedLeader, proposedZxid, proposedEpoch)) {
 updateProposal(n.leader, n.zxid, n.peerEpoch);
 sendNotifications();
 }
 if(LOG.isDebugEnabled()){
 LOG.debug("Adding vote: from=" + n.sid +
 ", proposed leader=" + n.leader +
 ", proposed zxid=0x" + Long.toHexString(n.zxid) +
 ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
 }
 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
 if (termPredicate(recvset,
 new Vote(proposedLeader, proposedZxid,
 logicalclock.get(), proposedEpoch))) {
 // Verify if there is any change in the proposed leader
 while((n = recvqueue.poll(finalizeWait,
 TimeUnit.MILLISECONDS)) != null){
 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
 proposedLeader, proposedZxid, proposedEpoch)){
 recvqueue.put(n);
 break;
 }
 }
 /*
 * This predicate is true once we don't read any new
 * relevant message from the reception queue
 */
 if (n == null) {
 self.setPeerState((proposedLeader == self.getId()) ?
 ServerState.LEADING: learningState());
 Vote endVote = new Vote(proposedLeader,
 proposedZxid, proposedEpoch);
 leaveInstance(endVote);
 return endVote;
 }
 }
 break;
 case OBSERVING:
 LOG.debug("Notification from observer: " + n.sid);
 break;
 case FOLLOWING:
 case LEADING:
 /*
 * Consider all notifications from the same epoch
 * together.
 */
 if(n.electionEpoch == logicalclock.get()){
 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
 if(termPredicate(recvset, new Vote(n.leader,
 n.zxid, n.electionEpoch, n.peerEpoch, n.state))
 && checkLeader(outofelection, n.leader, n.electionEpoch)) {
 self.setPeerState((n.leader == self.getId()) ?
 ServerState.LEADING: learningState());
 Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
 leaveInstance(endVote);
 return endVote;
 }
 }
 /*
 * Before joining an established ensemble, verify that
 * a majority are following the same leader.
 * Only peer epoch is used to check that the votes come
 * from the same ensemble. This is because there is at
 * least one corner case in which the ensemble can be
 * created with inconsistent zxid and election epoch
 * info. However, given that only one ensemble can be
 * running at a single point in time and that each 
 * epoch is used only once, using only the epoch to 
 * compare the votes is sufficient.
 * 
 * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
 */
 outofelection.put(n.sid, new Vote(n.leader, 
 IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
 if (termPredicate(outofelection, new Vote(n.leader,
 IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
 && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
 synchronized(this){
 logicalclock.set(n.electionEpoch);
 self.setPeerState((n.leader == self.getId()) ?
 ServerState.LEADING: learningState());
 }
 Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
 leaveInstance(endVote);
 return endVote;
 }
 break;
 default:
 LOG.warn("Notification state unrecoginized: " + n.state
 + " (n.state), " + n.sid + " (n.sid)");
 break;
 }
 } else {
 LOG.warn("Ignoring notification from non-cluster member " + n.sid);
 }
 }
 return null;
 } finally {
 try {
 if(self.jmxLeaderElectionBean != null){
 MBeanRegistry.getInstance().unregister(
 self.jmxLeaderElectionBean);
 }
 } catch (Exception e) {
 LOG.warn("Failed to unregister with JMX", e);
 }
 self.jmxLeaderElectionBean = null;
 }
 }

4. 小結(jié)

本文先介紹了zookeeper開(kāi)源分布式協(xié)作系統(tǒng)及其特點(diǎn)、應(yīng)用場(chǎng)景,然后根據(jù)zookeeper的啟動(dòng)方式,找到zookeeper的入口。在入口方法中,單機(jī)啟動(dòng)使用ZooKeeperServerMain,最終調(diào)用ZookeeperServer的startup()方法來(lái)RequestProcessor;集群?jiǎn)?dòng)時(shí)調(diào)用QuorumPeer的start方法,接著也是調(diào)用ZookeeperServer的startup()方法來(lái)RequestProcessor,最后調(diào)用選舉算法選出leader。

參考文獻(xiàn):

【1】http://zookeeper.apache.org/doc/r3.4.6/zookeeperOver.html

【2】http://zookeeper.apache.org/doc/r3.4.6/zookeeperStarted.html

分享到:
標(biāo)簽:zookeeper
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定