前言
我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。這篇文章的內(nèi)容,更多地主要是描述處理/交互流程性的東西,大部分流程圖都是經(jīng)過我梳理后畫出來的(開始我打算使用序列圖來描述流程,但是發(fā)現(xiàn)很多流程在單個(gè)對象內(nèi)部都已經(jīng)非常復(fù)雜,想要通過序列圖表達(dá)有點(diǎn)擔(dān)心描述不清,所以選擇最基本的程序流程圖),可能看起來比較枯燥,重點(diǎn)還是關(guān)注主要的處理流程要點(diǎn),特別的地方我會(huì)刻意標(biāo)示出來,便于理解。JobTracker與TaskTracker之間通過org.Apache.hadoop.mapred.InterTrackerProtocol協(xié)議來進(jìn)行通信,TaskTracker通過該接口進(jìn)行遠(yuǎn)程調(diào)用實(shí)現(xiàn)Heartbeat消息的發(fā)送,協(xié)議方法定義如下所示:
HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId) throws IOException;
通過該方法可以看出,最核心的Heartbeat報(bào)告數(shù)據(jù)都封裝在TaskTrackerStatus對象中,JobTracker端會(huì)接收TaskTracker周期性地發(fā)送的心跳報(bào)告,根據(jù)這些心跳信息來更新整個(gè)Hadoop集群中計(jì)算資源的狀態(tài)/數(shù)量,以及Task的運(yùn)行狀態(tài)。另外,在JobTracker端維護(hù)的對象的數(shù)據(jù)結(jié)構(gòu),主要包括如下3個(gè):
- TaskTracker:這個(gè)類是在JobTracker端定義的,描述了TaskTracker的基本信息和狀態(tài)(需要注意的是:它與TaskTracker進(jìn)程的實(shí)現(xiàn)類同名,但是含義完全不同)
- JobInProgress:簡寫JIP,在JobTracker端用來描述,JobClient提交的Job運(yùn)行狀態(tài)的數(shù)據(jù)結(jié)構(gòu),一個(gè)JIP對象還包含了組成一個(gè)Job的Task對應(yīng)的一組TIP的信息
- TaskInProgress:簡寫TIP,在JobTracker端用來描述,在TaskTracker上運(yùn)行的Task狀態(tài)的數(shù)據(jù)結(jié)構(gòu)(需要注意的是:在TaskTracker端也對應(yīng)一個(gè)TaskInProgress實(shí)現(xiàn)類,它與JobTracker端的同名,但是所包含的內(nèi)容也并不完全相同)
- TaskAttemptID:簡寫TAID,它是唯一標(biāo)識(shí)了組成一個(gè)Job的Task的一個(gè)運(yùn)行實(shí)例,一個(gè)Task(MapTask/ReduceTask)可能運(yùn)行多次,比如第一次運(yùn)行失敗,對應(yīng)一個(gè)失敗的TAID,第二次調(diào)度又運(yùn)行,又對應(yīng)一個(gè)新的TAID;再比如,推測執(zhí)行,可能會(huì)對應(yīng)著同一個(gè)Task的、具有2個(gè)不同TAID的Task運(yùn)行實(shí)例
- TaskTrackerStatus結(jié)構(gòu)TaskTrackerStatus對象要在網(wǎng)絡(luò)間進(jìn)行序列化傳輸,所以實(shí)現(xiàn)了接口org.apache.hadoop.io.Writable,該對象的數(shù)據(jù)結(jié)構(gòu),如下圖所示:
TaskTrackerStatus中各個(gè)數(shù)據(jù)項(xiàng)的含義,說明如下表所示:
下面,主要對ResourceStatus、TaskStatus、TaskTrackerHealthStatus進(jìn)行說明:
- ResourceStatus
ResourceStatus封裝了一個(gè)TaskTracker節(jié)點(diǎn)的資源信息,結(jié)構(gòu)如下圖所示:
TaskStatus
TaskStatus封裝了一個(gè)TaskTracker節(jié)點(diǎn)上運(yùn)行的Task的狀態(tài)信息,結(jié)構(gòu)如下圖所示:
上圖將TaskStatus的包含的數(shù)據(jù)結(jié)構(gòu)全部展開,可以根據(jù)字段含義來了解它所描述的一些信息。
- TaskTrackerHealthStatus
TaskTrackerHealthStatus封裝了TaskTracker的健康狀態(tài)信息,如下圖所示:
JobTracker處理Heartbeat流程JobTracker處理Heartbeat的流程,如果把每個(gè)處理細(xì)節(jié)都詳細(xì)地展開,非常地復(fù)雜,可能從頭到尾描述下來會(huì)感覺枯燥無味,所以這里我先概要地描述JobTracker處理Heartbeat的整體流程,然后再按照功能劃分出一個(gè)個(gè)看似還算獨(dú)立的子處理流程,單獨(dú)地進(jìn)行詳細(xì)說明,這樣能夠更容易理解。整體處理流程,如下圖所示:
下面,我們根據(jù)上面的Heartbeat處理流程圖,概要地說明Heartbeat是如何處理的,流程描述如下所示:
TaskTracker創(chuàng)建一個(gè)TaskTrackerStatus對象,TaskTrackerStatus內(nèi)部封裝的信息包括:TaskTracker所在節(jié)點(diǎn)的基本信息、運(yùn)行在TaskTracker上的Task的狀態(tài)信息、TaskTracker服務(wù)的健康狀態(tài)信息、TaskTracker的資源信息,另外發(fā)送心跳的RPC方法還包括restarted(TaskTracker是否重啟)、initialContact(TaskTracker是否初次連接JobTracker)、acceptNewTasks(TaskTracker是否能夠運(yùn)行新的Task)、responseId(心跳響應(yīng)ID),通過InterTrackerProtocol協(xié)議的heartbeat方法發(fā)送給JobTracker。
JobTracker接收到TaskTracker發(fā)送的心跳數(shù)據(jù)。
JobTracker檢查TaskTracker的host是否在黑名單中,如果TaskTracker在黑名單中,則直接拋出異常終止RPC調(diào)用,否則繼續(xù)下一步流程。
檢查TaskTracker RPC調(diào)用參數(shù)restarted的值,如果TaskTracker重啟了,則標(biāo)記TaskTracker狀態(tài)為健康狀態(tài);如果TaskTracker沒有重啟,則檢查是否可以指派任務(wù)在該TaskTracker上運(yùn)行。
如果TaskTracker不是初次連接JobTracker,檢查JobTracker是否存在上一次向該TaskTracker發(fā)送的Heartbeat響應(yīng)數(shù)據(jù),存在的話則說明TaskTracker因?yàn)槭チ伺cJobTracker之間的RPC連接而沒有接收到,JobTracker直接再給TaskTracker重新發(fā)送該響應(yīng)數(shù)據(jù);不存在的話,若JobTracker重啟了,使TaskTracker重新加入集群,需要通知Recovery Manager從恢復(fù)列表中移除該TaskTracker,若JobTracker未重啟,這種情況幾乎是不可能存在的(既然TaskTracker不是初次連接,JobTracker也沒有重啟,JobTracker端不可能沒有保存Heartbeat響應(yīng)數(shù)據(jù))。
處理JobTracker接收到的TaskTracker的Heartbeat信息,主要是TaskTrackerStatus封裝的數(shù)據(jù)。
根據(jù)處理Heartbeat數(shù)據(jù)結(jié)果,如果TaskTracker需要重新初始化,則發(fā)送一個(gè)帶有ReinitTrackerAction指令的Heartbeat響應(yīng)數(shù)據(jù),否則TaskTracker不需要重新初始化則繼續(xù)下一步流程。
檢查是否可以向該TaskTracker指派任務(wù),如果可以可以向該TaskTracker指派任務(wù),則直接使用TaskScheduler指定的調(diào)度策略,選擇當(dāng)前可以指派給TaskTracker的一組需要啟動(dòng)的Task(對應(yīng)指令LaunchTaskAction)。
根據(jù)TaskScheduler調(diào)度策略選擇的需要啟動(dòng)的Task,并根據(jù)TaskTracker發(fā)送的Task狀態(tài)報(bào)告,繼續(xù)選擇一些已經(jīng)完成/需要被清理的Task分配給TaskTracker:先檢查在該TaskTracker上是否有完成的Job,計(jì)算屬于這些Job的需要被Kill掉(對應(yīng)指令KillTaskAction)的Task;再檢查是否有完成的Job,并且對應(yīng)在該TaskTracker上的Task需要被清理(對應(yīng)指令KillTaskAction);最后檢查是否有已經(jīng)完成需要被提交的Task(以此來通知TaskTracker提交Task完成并更新狀態(tài),對應(yīng)指令CommitTaskAction)。
構(gòu)造一個(gè)包含可調(diào)度Task(LaunchTaskAction/KillTaskAction/CommitTaskAction)的HeartbeatResponse對象,更新JobTracker內(nèi)部維護(hù)的trackerToHeartbeatResponseMap映射。根據(jù)TaskTracker的Heartbeat報(bào)告的Task狀態(tài)信息,對標(biāo)記為完成的Task,更新JobTracker內(nèi)部維護(hù)的多個(gè)隊(duì)列和Map:trackerToMarkedTasksMap、taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap。最后,返回TaskTracker調(diào)用的結(jié)果:HeartbeatResponse對象。
上面流程圖中,黑色虛線所表示的處理流程,我們說明一下:這種情況是不可能出現(xiàn)的,因?yàn)門askTracker不是第一次連接JobTracker,而JobTracker端還沒有上一次TaskTracker發(fā)送的Heartbeat對應(yīng)的HeartbeatResponse,同時(shí)JobTracker又沒有重啟動(dòng)過,所以這種條件是不存在的,那么該流程分支也不可能執(zhí)行,故而用虛線描述,指向發(fā)送一個(gè)帶有ReinitTrackerAction的HeartbeatResponse。下面,我們細(xì)化整個(gè)流程,將一些比較重要的流程詳細(xì)分析說明:TaskTracker與JobTracker失去連接,更新狀態(tài)JobTracker如果在給定超時(shí)時(shí)間范圍之內(nèi)沒有收到TaskTracker的Heartbeat報(bào)告,會(huì)認(rèn)為該TaskTracker已經(jīng)無法執(zhí)行/指派任務(wù),那么在JobTracker端與該TaskTracker相關(guān)的數(shù)據(jù)結(jié)構(gòu)都需要更新,受到影響的Job和Task的數(shù)據(jù)結(jié)構(gòu)也需要更新,具體處理流程如下圖所示:
上述流程圖描述的流程,如下所示:
從隊(duì)列Map<String, Set<JobID>> trackerToJobsToCleanup中移除在該TaskTracker上已經(jīng)完成且需要清理的所有Job。
從隊(duì)列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中移除在TaskTracker上已經(jīng)運(yùn)行完成且需要清理的所有Task。
通知Recovery Manager從其維護(hù)的Set<String>類型的恢復(fù)列表JobTracker.RecoveryManager.recoveredTrackers中移除該TaskTracker。
從TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap中刪除在該TaskTracker上運(yùn)行的所有Task。
對在該TaskTracker上的運(yùn)行的每一個(gè)Task(在隊(duì)列trackerToTaskMap中),進(jìn)行如下2步處理:
(1)從隊(duì)列Map<TaskAttemptID, TaskInProgress> taskidToTIPMap中取出TaskAttemptID對應(yīng)的TaskInProgress tip結(jié)構(gòu),再根據(jù)tip獲取到JobInProgress:JobInProgress job = tip.getJob();;
(2)如果ReduceTask已經(jīng)完成,以及具有0個(gè)ReduceTask的所有MapTask已經(jīng)完成,則將這些Task放入到隊(duì)列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中;如果tip標(biāo)記Task沒有完成,或者滿足條件tip.isMapTask() && !tip.isJobSetupTask() && job.desiredReduces() != 0,檢查Job運(yùn)行狀態(tài),當(dāng)job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP成立時(shí),則該Task運(yùn)行失敗,并更新Task狀態(tài),同時(shí)收集這類Job,放入集合Set<JobInProgress> jobsWithFailures中,后續(xù)對這些Job進(jìn)行處理;
由于該TaskTracker被JobTracker標(biāo)記為lost狀態(tài),則對上面收集到的jobsWithFailures集合中的Job,只要存在屬于該Job的Task被分配到該TaskTracker上運(yùn)行,會(huì)通過累加計(jì)算在該TaskTracker上失敗的Task計(jì)數(shù),給該TaskTracker以懲罰,并釋放所有在該TaskTracker上預(yù)留的Slot。
從隊(duì)列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中移除所有被標(biāo)記完成的Task,同時(shí)更新JobTracker內(nèi)部維護(hù)的如下3個(gè)隊(duì)列:TreeMap<TaskAttemptID, String> taskidToTrackerMap、TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap、Map<TaskAttemptID, TaskInProgress> taskidToTIPMap。
如果在該TaskTracker上的運(yùn)行的Task還有沒處理的,則轉(zhuǎn)第6步進(jìn)行處理;否則,流程結(jié)束。
檢查是否可以向TaskTracker指派運(yùn)行Task當(dāng)TaskTracker發(fā)送Heartbeat標(biāo)志其沒有重啟,那么會(huì)執(zhí)行該子流程,如下圖所示:
在JobTracker端,既然TaskTracker匯報(bào)狀態(tài)表明其沒有重啟,那么就需要檢查該TaskTracker對應(yīng)的黑名單和灰名單情況,如果TaskTracker狀態(tài)一切正常,則恢復(fù)其正常被指派任務(wù)并運(yùn)行Task的能力。標(biāo)記TaskTracker為Health狀態(tài)當(dāng)TaskTracker重啟了,然后再次連接JobTracker時(shí),發(fā)送Heartbeat的過程中,會(huì)執(zhí)行該流程。重啟的TaskTracker,JobTracker會(huì)將一個(gè)TaskTracker標(biāo)記為Health狀態(tài),說明該TaskTracker對應(yīng)的資源信息(內(nèi)存/CPU)應(yīng)該在JobTracker端做記錄,表示這些資源是可用的,更新JobTracker端的幾個(gè)可用資源的變量計(jì)數(shù)。但是,很有可能TaskTracker重啟之前,其上運(yùn)行Task失敗了很多次,在JobTracker端記錄該失敗計(jì)數(shù),當(dāng)滿足一定條件后,會(huì)將TaskTracker加入灰名單,如果TaskTracker重啟了,應(yīng)該將其從灰名單中移除,以便不影響任務(wù)分派,具體處理流程如下圖所示:
上述流程圖比較簡單不再累述。更新TaskTracker狀態(tài)如果TaskTracker不是第一次連接JobTracker,那么在JobTracker端的隊(duì)列HashMap<String, TaskTracker> taskTrackers中會(huì)保存上一次TaskTracker向JobTracker匯報(bào)的狀態(tài)TaskTrackerStatus,如果該TaskTrackerStatus不存在,則直接處理當(dāng)前匯報(bào)的TaskTracker的狀態(tài)報(bào)告,使得JobTracker端維護(hù)的該TaskTracker的狀態(tài)是最新的,具體的處理流程,如下圖所示:
上圖中處理流程,描述如下所示:
檢查是否JobTracker端存在該TaskTracker上一次匯報(bào)的狀態(tài)報(bào)告,如果不存在,則直接處理當(dāng)前發(fā)送的狀態(tài)報(bào)告;否則,會(huì)更新JobTracker端維護(hù)的如下4個(gè)全局計(jì)數(shù)器:totalMaps(MapTask總數(shù))、totalReduces(ReduceTask總數(shù))、occupiedMapSlots(占用的Map Slot總數(shù))、occupiedReduceSlots(占用的Reduce Slot總數(shù)),在當(dāng)前計(jì)數(shù)值的基礎(chǔ)上,減去上次匯報(bào)的報(bào)告中的數(shù)量(實(shí)際上是假定上次匯報(bào)的全部指標(biāo)都已完成,如果沒完成,再通過本次匯報(bào)的狀態(tài)報(bào)告再加回去);如果TaskTracker沒有被加入到黑名單中,還需要更新下面2個(gè)JobTracker端全局計(jì)數(shù)器:totalMapTaskCapacity(該TaskTracker上最大Map Slot總數(shù))、totalReduceTaskCapacity(該TaskTracker上最大Reduce Slot總數(shù))。
處理TaskTracker當(dāng)前匯報(bào)的狀態(tài)報(bào)告,更新JobTracker內(nèi)部維護(hù)的6個(gè)全局計(jì)數(shù)器:totalMaps、totalReduces、occupiedMapSlots、occupiedReduceSlots、totalMapTaskCapacity、totalReduceTaskCapacity,各個(gè)計(jì)數(shù)器具體含義見上一步說明。
如果TaskTracker是第一次匯報(bào)狀態(tài)報(bào)告,則需要在JobTracker內(nèi)部注冊,構(gòu)造一個(gè)org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker對象(該TaskTracker對象是在JobTracker的視角看到的結(jié)構(gòu)),加入到隊(duì)列HashMap<String, TaskTracker> taskTrackers中,同時(shí)還要計(jì)算該TaskTracker所在的host節(jié)點(diǎn)上TaskTracker進(jìn)程的個(gè)數(shù),更新隊(duì)列Map<String, Integer> uniqueHostsMap。
更新TaskTracker上所有Task狀態(tài)在JobTracker處理TaskTracker發(fā)送的Heartbeat的過程中,首先會(huì)更新JobTracker維護(hù)的TaskTracker的狀態(tài)信息,因?yàn)橐粋€(gè)TaskTracker上可能運(yùn)行著很多Task,那么需要更新這些Task的狀態(tài),可以通過上面介紹的TaskTrackerStatus的結(jié)構(gòu)看出,對應(yīng)著一個(gè)TaskStatus的狀態(tài)報(bào)告集合,所以這里有一個(gè)批量更新TaskStatus狀態(tài)的操作,實(shí)際上會(huì)對每一個(gè)Task的狀態(tài)分別進(jìn)行更新,整體處理流程如下圖所示:
具體處理流程,描述如下所示:
從TaskTracker發(fā)送的TaskTrackerStatus對象可以提取Task狀態(tài)報(bào)告集合,然后對每一個(gè)狀態(tài)報(bào)告進(jìn)行處理,直到所有的Task的狀態(tài)都已經(jīng)被更新到JobTracker內(nèi)部維護(hù)的狀態(tài)對象上,下面描述每一個(gè)TaskStatus的處理過程:
(1)如果一個(gè)Task的運(yùn)行狀態(tài)不為TaskStatus.State.UNASSIGNED,說明該Task還沒有在TaskTracker上獲得運(yùn)行機(jī)會(huì),則并不讓該Task失敗(當(dāng)一個(gè)Task指派給一個(gè)TaskTracker運(yùn)行時(shí),會(huì)首先在JobTracker端加入到一個(gè)超時(shí)列表中,由一個(gè)獨(dú)立的線程JobTracker.ExpireLaunchingTasks去檢測,該Task是否在給定的時(shí)間內(nèi)(默認(rèn)是10分鐘 )是否在TaskTracker上啟動(dòng)而且一直沒有報(bào)告狀態(tài),如果沒有報(bào)告,則會(huì)將該Task標(biāo)記為失敗),等待下一次被調(diào)度分配給TaskTracker去運(yùn)行。
(2)根據(jù)Task的ID,獲取到它對應(yīng)的JobInProgress信息,如果沒有獲取到則將該Task對應(yīng)的JobInProgress對象加入到cleanup列表Map<String, Set<JobID>> trackerToJobsToCleanup中,直接返回繼續(xù)處理下一個(gè)TaskStatus報(bào)告;如果能夠獲取到對應(yīng)的JobInProgress信息,則檢查該JobInProgress中包含的Job是否設(shè)置初始化完成狀態(tài),如果沒有設(shè)置,則直接將該Task加入到隊(duì)列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中,等待JobTracker調(diào)度Kill掉該Task,直接返回繼續(xù)處理下一個(gè)TaskStatus報(bào)告。
(3)檢查該TaskStatus報(bào)告中對應(yīng)的TaskAttemptID,是否在JobTracker端存在對應(yīng)的TaskInProgress對象,很有可能JobTracker重啟,內(nèi)存中維護(hù)的Map<TaskAttemptID, TaskInProgress> taskidToTIPMap隊(duì)列中沒有TaskInProgress對象,這時(shí)JobInProgress對象一定存在,可以通過JobInProgress對象獲取到該Task對應(yīng)的TaskInProgress對象(因?yàn)樵贘obTracker端創(chuàng)建Job的時(shí)候,會(huì)分別創(chuàng)建4類TIP:map、reduce、cleanup、setup),再將其加入到Map<TaskAttemptID, TaskInProgress> taskidToTIPMap隊(duì)列中,同時(shí)觸發(fā)已知的一組JobInProgressListener的jobUpdated方法,去更新Job狀態(tài)。
(4)根據(jù)TaskStatus能夠獲取到所有Fetch失敗的Task,查詢該Task對應(yīng)的TaskInProgress對象,從而進(jìn)一步通知JobInProgress對象,根據(jù)設(shè)定的允許Task Fetch失敗的最大次數(shù)限制,確定是否要讓該Task失敗,并更新TaskInProgress狀態(tài)。
更新Task狀態(tài)當(dāng)Task的狀態(tài)發(fā)生變化的情況下,可能需要更新Task的狀態(tài),我們根據(jù)JobTracker定義的updateTaskStatus方法,方法聲明如下所示:1public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus status)其中,tip是當(dāng)前在JobTracker端維護(hù)的Task的狀態(tài),status是TaskTracker匯報(bào)的Task狀態(tài),更新JobTracker端Task狀態(tài)主要是根據(jù)心跳匯報(bào)的status來更新tip數(shù)據(jù)結(jié)構(gòu)。更新Task狀態(tài)的具體流程,如下圖所示:
更新Task狀態(tài),主要是更新每個(gè)Task對應(yīng)的在JobTracker端維護(hù)的TaskInProgress結(jié)構(gòu),處理流程描述如下:
如果心跳匯報(bào)的status中,Task運(yùn)行狀態(tài)為SUCCEEDED,當(dāng)tip標(biāo)識(shí)已經(jīng)完成或標(biāo)識(shí)被Kill掉,則統(tǒng)一修改status的運(yùn)行狀態(tài)為KILLED;如果心跳匯報(bào)的status對應(yīng)的TaskAttemptID不是cleanup task,當(dāng)該TaskAttemptID 對應(yīng)的JobInProgress表示Job已經(jīng)完成,或失敗,或被Kill掉,那么status運(yùn)行狀態(tài)為FAILED_UNCLEAN則修改為FAILED,運(yùn)行狀態(tài)為KILLED_UNCLEAN則修改為KILLED。
調(diào)用TaskInProgress的updateStatus方法,傳入當(dāng)前TaskTracker匯報(bào)的status狀態(tài)對象,更新tip的狀態(tài)。TaskInProgress會(huì)維護(hù)每個(gè)Task對應(yīng)的TaskStatus對象oldStatus,并根據(jù)匯報(bào)的status對更新替換oldStatus。有3種情況不需要更新:第一種是當(dāng)status的運(yùn)行狀態(tài)不等于RUNNING/COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEAN/UNASSIGNED中的任何一種狀態(tài);第二種是status的運(yùn)行狀態(tài)為RUNNING、UNASSIGNED中的任意一種狀態(tài),并且oldStatus的運(yùn)行狀態(tài)為FAILED/KILLED/FAILED_UNCLEAN/KILLED_UNCLEAN/SUCCEEDED/COMMIT_PENDING中任意一種狀態(tài);第三種是oldStatus的運(yùn)行狀態(tài)為FAILED/KILLED中的任意一種狀態(tài),這種情況會(huì)把該TaskAttemptID加入到隊(duì)列TreeMap<TaskAttemptID, Boolean> tasksToKill中標(biāo)識(shí)需要Kill掉該Task。
如果status的運(yùn)行狀態(tài)為FAILED狀態(tài),并且JobTracker在Safe模式下,則設(shè)置status的運(yùn)行狀態(tài)為KILLED。
此時(shí),如果oldStatus與status不相等,即TaskAttemptID的狀態(tài)已經(jīng)發(fā)生變化,則會(huì)根據(jù)status的運(yùn)行狀態(tài)創(chuàng)建不同的TaskCompletionEvent事件(SUCCEEDED/FAILED/KILLED),這些 TaskCompletionEvent事件會(huì)被加入到JobInProgress的taskCompletionEvents列表中,供JobClient查詢或供JobTracker檢索;或者執(zhí)行相應(yīng)的操作:如果運(yùn)行狀態(tài)為FAILED_UNCLEAN/KILLED_UNCLEAN,則tip中該TaskAttemptID標(biāo)記為失敗并更新相關(guān)結(jié)構(gòu),然后加入到mapCleanupTasks/reduceCleanupTasks列表中等待被清理,同時(shí)將該TaskAttemptID對應(yīng)的數(shù)據(jù)從JobTracker的taskidToTIPMap、taskidToTrackerMap、trackerToTaskMap這3個(gè)隊(duì)列中刪除。
根據(jù)構(gòu)造的TaskCompletionEvent對象,并且如果status的運(yùn)行狀態(tài)為SUCCEEDED,則更新其對應(yīng)的JobInProgress的狀態(tài)為成功。(原創(chuàng):時(shí)延軍(包含鏈接:http://shiyanjun.cn))