導(dǎo)讀:本文我們將重點(diǎn)介紹集群運(yùn)行時(shí)中ResourceManager的設(shè)計(jì)和實(shí)現(xiàn),了解如何通過ResourceManager對集群的計(jì)算資源進(jìn)行有效管理。
作者:張利兵
來源:華章科技
01 ResourceManager詳解
ResourceManager作為統(tǒng)一的集群資源管理器,用于管理整個(gè)集群的計(jì)算資源,包括CPU資源、內(nèi)存資源等。
同時(shí),ResourceManager負(fù)責(zé)向集群資源管理器中申請容器資源啟動(dòng)TaskManager實(shí)例,并對TaskManager進(jìn)行集中管理。當(dāng)新的作業(yè)提交到集群后,JobManager會(huì)向ResourceManager申請作業(yè)執(zhí)行需要的計(jì)算資源,進(jìn)而完成整個(gè)作業(yè)的運(yùn)行。
如圖3-12所示,為了兼容Hadoop Yarn、Kubernetes、Mesos等集群資源管理器,在ResourceManager抽象實(shí)現(xiàn)類的基礎(chǔ)上,分別實(shí)現(xiàn)了ActiveResourceManager、
Standalone-ResourceManager以及MesosResourceManager等子類。
其中ActiveResourceManager實(shí)現(xiàn)了動(dòng)態(tài)資源管理,可以根據(jù)提交的作業(yè)動(dòng)態(tài)選擇啟動(dòng)或停止TaskManager實(shí)例。目前支持TaskManager動(dòng)態(tài)管理和啟動(dòng)的ResourceManager主要有KubernetesResourceManager和Yarn-ResourceManager實(shí)現(xiàn)類。
▲圖3-12 ResourceManager UML關(guān)系圖
從圖3-12中可以看出,ResourceManager通過實(shí)現(xiàn)ResourceManagerGateway接口,向其他組件提供RPC遠(yuǎn)程訪問能力,如TaskManager服務(wù)和JobManager服務(wù)的Resource-ManagerGateway會(huì)將RPC訪問請求發(fā)送到ResourceManager服務(wù)中。
另外,Resource-Manager繼承了FencedRpcEndpoint基本實(shí)現(xiàn)類,使得ResourceManager可以作為一個(gè)RpcEndpoint節(jié)點(diǎn),通過ResourceManagerGateway接口提供給其他服務(wù)節(jié)點(diǎn),使之能夠以RPC的方式訪問ResourceManager服務(wù)。
同時(shí),ResourceManager實(shí)現(xiàn)了LeaderContender接口,可以作為競爭節(jié)點(diǎn)讓LeaderElectionService進(jìn)行Leader節(jié)點(diǎn)的選舉,保證整個(gè)集群ResourceManager組件服務(wù)的高可用。
從圖3-12中也可以看出,ResourceManager主要包含如下成員變量。
- resourceId:ResourceManager對應(yīng)的唯一資源ID。
- jobManagerRegistrations:專門存儲JobManager注冊信息。其中Key為JobID;Value為JobManagerRegistration,當(dāng)啟動(dòng)JobManager服務(wù)時(shí),就會(huì)將JobManager信息注冊在jobManagerRegistrations實(shí)例中。
- jmResourceIdRegistrations:用于存儲JobManager注冊信息,與jobManagerRegistrations的區(qū)別在于Key為ResourceID。
- jobLeaderIdService:用于獲取Job Leader ID的服務(wù),在開啟的高可用集群中,當(dāng)JobManager的Leader節(jié)點(diǎn)發(fā)生切換時(shí),會(huì)借助jobLeaderIdService獲取當(dāng)前作業(yè)有效的JobID和地址信息。
- taskExecutors:注冊在ResourceManager的TaskExecutor列表中,其中Key為Task-Executor對應(yīng)的ResourceID,Value為WorkRegistration,即TaskExecutor向Resource-Manager注冊過程中所提供的信息。
- taskExecutorGatewayFutures:專門存儲TaskExecutorGateway的CompletableFuture對象,Key為TaskExecutor對應(yīng)的ResourceID,Value為CompletableFuture,用于獲取Task-ExecutorGateway,實(shí)現(xiàn)與TaskExecutor之間的RPC通信。
- highAvailabilityServices:系統(tǒng)高可用服務(wù),基于highAvailabilityServices服務(wù)支持組件高可用。
- heartbeatServices:用于創(chuàng)建HeartbeatManager服務(wù),和其他組件之間建立心跳連接。
- fatalErrorHandler:系統(tǒng)異常錯(cuò)誤處理,當(dāng)ResourceManager出現(xiàn)異常時(shí)調(diào)用fatal-ErrorHandler處理異常錯(cuò)誤。
- slotManager:ResourceManager的內(nèi)部組件,用于管理集群的可用Slot資源,同時(shí)接收并處理TaskExecutor的SlotReport。
- clusterInformation:存儲整個(gè)Flink集群共享的信息,包括blobServerHostname和blobServerPort等配置。
- resourceManagerMetricGroup:ResourceManager的MetricGroup,用于收集和Resource-Manager相關(guān)的監(jiān)控指標(biāo)。
- leaderElectionService:基于ZooKeeper實(shí)現(xiàn)的Leader選舉服務(wù),在這里用于實(shí)現(xiàn)Resource-Manager組件高可用。
- taskManagerHeartbeatManager:管理與TaskManager之間的心跳信息。
- jobManagerHeartbeatManager:管理與JobManager之間的心跳信息。
- clearStateFuture:用于停止ResourceManager后進(jìn)行數(shù)據(jù)異步清理。
02 ResourceManagerGateway接口實(shí)現(xiàn)
ResourceManagerGateway接口提供了ResourceManager需要的RPC方法,供其他集群組件調(diào)用。例如在TaskExecutor中調(diào)用ResourceManagerGateway完成在ResourceManager中注冊TaskExecutor的操作。
如圖3-13所示,通過對ResourceManagerGateway中提供的RPC方法進(jìn)行梳理,得到JobManager、TaskExecutor、WebMonitorEndpoint和Dispatcher等組件與ResourceManager-Gateway之間的RPC調(diào)用關(guān)系圖。
▲圖3-13 ResourceManager調(diào)用關(guān)系圖
從圖3-13中可以看出,JobManager、TaskExecutor、WebMonitorEndpoint和Dispatcher組件分別使用如下方法與ResourceManager服務(wù)進(jìn)行交互。
1. JobManager和ResourceManager 的RPC調(diào)用
- registerJobManager():在ResourceManager中注冊JobManager服務(wù),此時(shí)會(huì)在job-LeaderIdService服務(wù)中添加注冊的JobManager信息。
- requestSlot():JobManager向ResourceManager申請運(yùn)行Task所需的Slot資源。
- heartbeatFromJobManager():用于在JobManager與ResourceManager之間建立長期的心跳連接。
- disconnectJobManager():根據(jù)JobID刪除之前注冊在ResourceManager中的Job-Manager信息,并且關(guān)閉JobManager與ResourceManager之間的RPC連接。
2. TaskExecutor和ResourceManager 的RPC調(diào)用
- heartbeatFromTaskManager():在TaskExecutor中調(diào)用heartbeatFromTaskManager()方法,構(gòu)建TaskExecutor與ResourceManager之間的心跳連接。
- disconnectTaskManager():停止TaskExecutor組件時(shí)會(huì)調(diào)用disconnectTaskManager()方法斷開TaskExecutor與ResourceManager之間的RPC連接。
- registerTaskExecutor():當(dāng)新的TaskExecutor啟動(dòng)時(shí),會(huì)調(diào)用該方法向Resource-Manager注冊TaskExecutor信息。
- sendSlotReport():當(dāng)TaskExecutor啟動(dòng)并注冊成功后,會(huì)調(diào)用sendSlotReport()方法向ResourceManager上報(bào)SlotReport。SlotReport中包含TaskExecutor的資源數(shù)量和配置信息等內(nèi)容。
- notifySlotAvailable():當(dāng)TaskExecutor中具有空閑Slot計(jì)算資源時(shí),會(huì)調(diào)用notify-SlotAvailable()方法通知ResourceManager將該Slot資源變?yōu)锳vailable狀態(tài)。
- cancelSlotRequest():取消JobManager已經(jīng)分配的資源。
3. Dispatcher和ResourceManager的RPC調(diào)用
- requestResourceOverview():用于在Dispatcher中獲取集群資源信息,包括集群中的TaskManager、numberRegisteredSlots以及numberFreeSlots數(shù)量。
- requestTaskManagerMetricQueryServiceAddresses():從ResourceManager獲取Task-Manager的MetricQueryService路徑,主要用于前端獲取TaskManager的監(jiān)控指標(biāo)。
4. WebMonitorEndpoint和ResourceManager 的RPC調(diào)用
- requestTaskManagerInfo():用于獲取TaskManager的相關(guān)信息,即TaskExecutor啟動(dòng)過程中注冊在ResourceManager的信息,包括TaskExecutor的網(wǎng)關(guān)地址、端口以及TaskExecutor的硬件信息。
- requestTaskManagerFileUpload():請求上傳文件到BlobServer上,返回Transient-BlobKey。
03 Slot計(jì)算資源管理
如圖3-14所示,ResourceManager內(nèi)部主要通過SlotManager服務(wù)統(tǒng)一對整個(gè)集群的Slot計(jì)算資源進(jìn)行管理。Slot被稱為資源卡槽,用于表示可以分配的最小計(jì)算資源單位,提交的Task最終會(huì)運(yùn)行在Slot表示的計(jì)算資源中。
▲圖3-14 Slot計(jì)算資源管理
從圖3-14中可以看出,ResourceManager包含了Register Slot和Free Slot兩個(gè)鍵值對集合。其中Register Slot專門存儲ResourceManager中所有已經(jīng)注冊的TaskManagerSlot信息,F(xiàn)ree Slot集合則存儲了當(dāng)前SlotManager中處于空閑狀態(tài)且還沒有被分配和使用的Slot集合。
TaskManagerSlot對象包含了SlotID、ResourceProfile以及TaskExecutorConnection等信息。如果Slot被分配使用,在TaskManagerSlot中還會(huì)存儲AllocationID和JobID等分配信息,表明當(dāng)前Slot已經(jīng)被指定JobID對應(yīng)的JobManager使用。
另外,SlotManager還包含了pendingSlotRequests和fulfilledSlotRequests兩個(gè)鍵值對集合。其中pendingSlotRequests存儲了所有處于pending和unfulfilled狀態(tài)的Slot請求,fulfilledSlotRequests存儲了所有已經(jīng)分配完成的Slot請求。
Slot資源申請都會(huì)以Pending-SlotRequest的形式存儲在pendingSlotRequests集合中,等待SlotManager根據(jù)當(dāng)前集群的Slot資源進(jìn)行分配。
當(dāng)符合條件的Slot資源分配給指定的PendingSlotRequest后,會(huì)為其創(chuàng)建AllocationId,并將分配了AllocationId和SlotId信息的SlotRequest存儲到fulfilled-SlotRequests集合中。
對Slot計(jì)算資源的注冊和管理,主要是在TaskManager和ResourceManager服務(wù)之間進(jìn)行的,TaskManager作為Slot計(jì)算資源的提供方,ResourceManager則作為Slot計(jì)算資源的接收和管理方。這里我們簡單梳理一下TaskManager向SlotManager中注冊Slot資源的整個(gè)過程。
- 啟動(dòng)TaskManager后,調(diào)用ResourceManagerGateway.registerTaskExecutor()方法向ResourceManager中注冊TaskManager連接信息。
- 創(chuàng)建TaskManager和ResourceManager之間的RPC連接,TaskManager調(diào)用Resource-ManagerGateway.sendSlotReport()方法向ResourceManager發(fā)送SlotReport信息,接著ResourceManager調(diào)用SlotManager.registerTaskManager()方法,將TaskManager的資源信息寫入SlotManager。
- 在SlotManager中根據(jù)SlotReport中的Slot信息創(chuàng)建TaskManagerSlot,并注冊到SlotManager的HashMap<SlotID, TaskManagerSlot> slots集合中。
- SlotManager含有HashMap<SlotID, TaskManagerSlot> slots和LinkedHashMap<SlotID, TaskManagerSlot> freeSlots兩個(gè)Slot集合。前者維護(hù)所有注冊到SlotManager中的Slot計(jì)算資源,后者存儲當(dāng)前SlotManager中可用的Slot資源。
在SlotManager中完成Slot資源注冊后,等待集群提交和運(yùn)行作業(yè)。JobManager通過調(diào)用ResourceManagerGateway中的相關(guān)方法為作業(yè)申請Slot計(jì)算資源,整個(gè)申請過程如下。
- JobManager調(diào)用ResourceManagerGateway.requestSlot()方法向ResourceManager發(fā)起Slot計(jì)算資源申請。
- ResourceManager內(nèi)部會(huì)調(diào)用SlotManager.registerSlotRequest()方法,向SlotManager申請作業(yè)需要的Slot計(jì)算資源。
- SlotManager中維護(hù)了HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests集合,將所有的PendingSlotRequest存儲在該集合中,并根據(jù)SlotRequest的Resource-Profile匹配合適的Slot計(jì)算資源,然后對Slot進(jìn)行分配。
- 當(dāng)SlotRequest需要的Slot計(jì)算資源分配完畢后,將已經(jīng)分配的SlotID信息寫入HashMap<AllocationID, SlotID> fulfilledSlotRequests集合。
SlotManager組件會(huì)對Slot進(jìn)行統(tǒng)一的管理,在內(nèi)部構(gòu)建一個(gè)Slot計(jì)算資源池,有新的Slot注冊時(shí),會(huì)優(yōu)先從pendingSlotRequests集合中獲取處于Pending狀態(tài)的SlotRequest,并為該SlotRequest分配Slot計(jì)算資源。
以上就是在ResourceManager中注冊和分配Slot計(jì)算資源的全部過程,本文篇幅有限,Slot注冊和分配過程中涉及的核心代碼的詳細(xì)介紹,請見《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》第3.3.3節(jié)。
關(guān)于作者:張利兵,資深大數(shù)據(jù)專家和架構(gòu)師,現(xiàn)任第四范式AI數(shù)據(jù)平臺架構(gòu)師,曾就職于明略數(shù)據(jù)。Apache Flink的貢獻(xiàn)者,對Flink有非常深入的研究。長期從事大數(shù)據(jù)架構(gòu)落地以及機(jī)器學(xué)習(xí)平臺與數(shù)據(jù)平臺研發(fā)架構(gòu)工作,在Hadoop、Spark、機(jī)器學(xué)習(xí)等方面積累了豐富的經(jīng)驗(yàn)。先后參與和主導(dǎo)了銀行、證券、地鐵等領(lǐng)域的大數(shù)據(jù)平臺的架構(gòu)設(shè)計(jì)與實(shí)現(xiàn)。《Flink原理、實(shí)戰(zhàn)與性能優(yōu)化》作者,極客時(shí)間《Flink原理與實(shí)戰(zhàn)》專欄作者。
本書摘編自《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》,經(jīng)出版方授權(quán)發(fā)布。