kube-scheduler 是 kube.NETes 的核心組件之一,主要負責整個集群資源的調度功能,根據特定的調度算法和策略,將 Pod 調度到最優的工作節點上面去,從而更加合理、更加充分的利用集群的資源,這也是我們選擇使用 kubernetes 一個非常重要的理由。如果一門新的技術不能幫助企業節約成本、提供效率,我相信是很難推進的。
調度流程
默認情況下,kube-scheduler 提供的默認調度器能夠滿足我們絕大多數的要求,我們前面和大家接觸的示例也基本上用的默認的策略,都可以保證我們的 Pod 可以被分配到資源充足的節點上運行。但是在實際的線上項目中,可能我們自己會比 kubernetes 更加了解我們自己的應用,比如我們希望一個 Pod 只能運行在特定的幾個節點上,或者這幾個節點只能用來運行特定類型的應用,這就需要我們的調度器能夠可控。
kube-scheduler 的主要作用就是根據特定的調度算法和調度策略將 Pod 調度到合適的 Node 節點上去,是一個獨立的二進制程序,啟動之后會一直監聽 API Server,獲取到 PodSpec.NodeName 為空的 Pod,對每個 Pod 都會創建一個 binding。
kube-scheduler structrue
這個過程在我們看來好像比較簡單,但在實際的生產環境中,需要考慮的問題就有很多了:
- 如何保證全部的節點調度的公平性?要知道并不是所有節點資源配置一定都是一樣的
- 如何保證每個節點都能被分配資源?
- 集群資源如何能夠被高效利用?
- 集群資源如何才能被最大化使用?
- 如何保證 Pod 調度的性能和效率?
- 用戶是否可以根據自己的實際需求定制自己的調度策略?
考慮到實際環境中的各種復雜情況,kubernetes 的調度器采用插件化的形式實現,可以方便用戶進行定制或者二次開發,我們可以自定義一個調度器并以插件形式和 kubernetes 進行集成。
kubernetes 調度器的源碼位于 kubernetes/pkg/scheduler 中,其中 Scheduler 創建和運行的核心程序,對應的代碼在 pkg/scheduler/scheduler.go,如果要查看 kube-scheduler 的入口程序,對應的代碼在 cmd/kube-scheduler/scheduler.go。
調度主要分為以下幾個部分:
- 首先是預選過程,過濾掉不滿足條件的節點,這個過程稱為 Predicates(過濾)
- 然后是優選過程,對通過的節點按照優先級排序,稱之為 Priorities(打分)
- 最后從中選擇優先級最高的節點,如果中間任何一步驟有錯誤,就直接返回錯誤
Predicates 階段首先遍歷全部節點,過濾掉不滿足條件的節點,屬于強制性規則,這一階段輸出的所有滿足要求的節點將被記錄并作為第二階段的輸入,如果所有的節點都不滿足條件,那么 Pod 將會一直處于 Pending 狀態,直到有節點滿足條件,在這期間調度器會不斷的重試。
所以我們在部署應用的時候,如果發現有 Pod 一直處于 Pending 狀態,那么就是沒有滿足調度條件的節點,這個時候可以去檢查下節點資源是否可用。
Priorities 階段即再次對節點進行篩選,如果有多個節點都滿足條件的話,那么系統會按照節點的優先級(priorites)大小對節點進行排序,最后選擇優先級最高的節點來部署 Pod 應用。
下面是調度過程的簡單示意圖:
kube-scheduler filter
更詳細的流程是這樣的:
- 首先,客戶端通過 API Server 的 REST API 或者 kubectl 工具創建 Pod 資源
- API Server 收到用戶請求后,存儲相關數據到 etcd 數據庫中
- 調度器監聽 API Server 查看到還未被調度(bind)的 Pod 列表,循環遍歷地為每個 Pod 嘗試分配節點,這個分配過程就是我們上面提到的兩個階段:
- 預選階段(Predicates),過濾節點,調度器用一組規則過濾掉不符合要求的 Node 節點,比如 Pod 設置了資源的 request,那么可用資源比 Pod 需要的資源少的主機顯然就會被過濾掉
- 優選階段(Priorities),為節點的優先級打分,將上一階段過濾出來的 Node 列表進行打分,調度器會考慮一些整體的優化策略,比如把 Deployment 控制的多個 Pod 副本盡量分布到不同的主機上,使用最低負載的主機等等策略
- 經過上面的階段過濾后選擇打分最高的 Node 節點和 Pod 進行 binding 操作,然后將結果存儲到 etcd 中 最后被選擇出來的 Node 節點對應的 kubelet 去執行創建 Pod 的相關操作(當然也是 watch APIServer 發現的)。
目前調度器已經全部通過插件的方式實現了調度框架,默認開啟的內置調度插件如以下代碼所示:
// pkg/scheduler/framework/plugins/registry.go
// NewInTreeRegistry 使用所有內部插件構建注冊表。
// 外部插件可以通過 WithFrameworkOutOfTreeRegistry 選項注冊額外的插件。
func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness),
}
registry := runtime.Registry{
dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New),
selectorspread.Name: selectorspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
nodename.Name: nodename.New,
nodeports.Name: nodeports.New,
nodeaffinity.Name: nodeaffinity.New,
podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New),
nodeunschedulable.Name: nodeunschedulable.New,
noderesources.Name: runtime.FactoryAdapter(fts, noderesources.NewFit),
noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
nodevolumelimits.AzurediskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
interpodaffinity.Name: interpodaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New),
schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New),
}
return registry
}
從上面我們可以看出調度器的一系列算法由各種插件在調度的不同階段來完成,下面我們就先來了解下調度框架。
調度框架
調度框架定義了一組擴展點,用戶可以實現擴展點定義的接口來定義自己的調度邏輯(我們稱之為擴展),并將擴展注冊到擴展點上,調度框架在執行調度工作流時,遇到對應的擴展點時,將調用用戶注冊的擴展。調度框架在預留擴展點時,都是有特定的目的,有些擴展點上的擴展可以改變調度程序的決策方法,有些擴展點上的擴展只是發送一個通知。
我們知道每當調度一個 Pod 時,都會按照兩個過程來執行:調度過程和綁定過程。
調度過程為 Pod 選擇一個合適的節點,綁定過程則將調度過程的決策應用到集群中(也就是在被選定的節點上運行 Pod),將調度過程和綁定過程合在一起,稱之為調度上下文(scheduling context)。需要注意的是調度過程是??同步?
?運行的(同一時間點只為一個 Pod 進行調度),綁定過程可異步運行(同一時間點可并發為多個 Pod 執行綁定)。
調度過程和綁定過程遇到如下情況時會中途退出:
- 調度程序認為當前沒有該 Pod 的可選節點
- 內部錯誤
這個時候,該 Pod 將被放回到 待調度隊列,并等待下次重試。
擴展點(Extension Points)
下圖展示了調度框架中的調度上下文及其中的擴展點,一個擴展可以注冊多個擴展點,以便可以執行更復雜的有狀態的任務。
scheduling framework extensions
PreEnqueue 該擴展在將 Pod 添加到內部活動隊列之前被調用,其中 Pod 被標記為準備好進行調度。只有當所有 PreEnqueue 插件都返回 Success 時,才允許 Pod 進入活動隊列。否則,它會被放入內部不可調度的 Pod 列表中,并且不會出現不可調度的情況。(可以向 Pod 的 API 添加一個 .spec.schedulingGates 字段,以標記 Pod 的調度準備是否就緒,當 Pod 準備好進行調度時,服務商可以更改此字段以向調度程序發出信號。)
- QueueSort 擴展用于對 Pod 的待調度隊列進行排序,以決定先調度哪個 Pod,QueueSort 擴展本質上只需要實現一個方法 Less(*QueuedPodInfo, *QueuedPodInfo) 用于比較兩個 Pod 誰更優先獲得調度即可,同一時間點只能有一個 QueueSort 插件生效。
- Pre-filter 擴展用于對 Pod 的信息進行預處理,或者檢查一些集群或 Pod 必須滿足的前提條件,如果 pre-filter 返回了 error,則調度過程終止。
- Filter 擴展用于排除那些不能運行該 Pod 的節點,對于每一個節點,調度器將按順序執行 filter 擴展;如果任何一個 filter 將節點標記為不可選,則余下的 filter 擴展將不會被執行。調度器可以同時對多個節點執行 filter 擴展。
- Post-filter 是一個通知類型的擴展點,調用該擴展的參數是 filter 階段結束后被篩選為可選節點的節點列表,可以在擴展中使用這些信息更新內部狀態,或者產生日志或 metrics 信息。
- Scoring 擴展用于為所有可選節點進行打分,調度器將針對每一個節點調用 Soring 擴展,評分結果是一個范圍內的整數。在 normalize scoring 階段,調度器將會把每個 scoring 擴展對具體某個節點的評分結果和該擴展的權重合并起來,作為最終評分結果。
- Normalize scoring 擴展在調度器對節點進行最終排序之前修改每個節點的評分結果,注冊到該擴展點的擴展在被調用時,將獲得同一個插件中的 scoring 擴展的評分結果作為參數,調度框架每執行一次調度,都將調用所有插件中的一個 normalize scoring 擴展一次。
- Reserve 是一個通知性質的擴展點,有狀態的插件可以使用該擴展點來獲得節點上為 Pod 預留的資源,該事件發生在調度器將 Pod 綁定到節點之前,目的是避免調度器在等待 Pod 與節點綁定的過程中調度新的 Pod 到節點上時,發生實際使用資源超出可用資源的情況(因為綁定 Pod 到節點上是異步發生的)。這是調度過程的最后一個步驟,Pod 進入 reserved 狀態以后,要么在綁定失敗時觸發 Unreserve 擴展,要么在綁定成功時,由 Post-bind 擴展結束綁定過程。
- Permit 擴展用于阻止或者延遲 Pod 與節點的綁定。Permit 擴展可以做下面三件事中的一項:
- Approve(批準):當所有的 permit 擴展都 approve 了 Pod 與節點的綁定,調度器將繼續執行綁定過程。
- deny(拒絕):如果任何一個 permit 擴展 deny 了 Pod 與節點的綁定,Pod 將被放回到待調度隊列,此時將觸發 Unreserve 擴展。
- wait(等待):如果一個 permit 擴展返回了 wait,則 Pod 將保持在 permit 階段,直到被其他擴展 approve,如果超時事件發生,wait 狀態變成 deny,Pod 將被放回到待調度隊列,此時將觸發 Unreserve 擴展。
- Pre-bind 擴展用于在 Pod 綁定之前執行某些邏輯。例如,pre-bind 擴展可以將一個基于網絡的數據卷掛載到節點上,以便 Pod 可以使用。如果任何一個 pre-bind 擴展返回錯誤,Pod 將被放回到待調度隊列,此時將觸發 Unreserve 擴展。
- Bind 擴展用于將 Pod 綁定到節點上:
- 只有所有的 pre-bind 擴展都成功執行了,bind 擴展才會執行
- 調度框架按照 bind 擴展注冊的順序逐個調用 bind 擴展。
- 具體某個 bind 擴展可以選擇處理或者不處理該 Pod。
- 如果某個 bind 擴展處理了該 Pod 與節點的綁定,余下的 bind 擴展將被忽略
- Post-bind 是一個通知性質的擴展:
- Post-bind 擴展在 Pod 成功綁定到節點上之后被動調用。
- Post-bind 擴展是綁定過程的最后一個步驟,可以用來執行資源清理的動作。
- Unreserve 是一個通知性質的擴展,如果為 Pod 預留了資源,Pod 又在被綁定過程中被拒絕綁定,則 unreserve 擴展將被調用。Unreserve 擴展應該釋放已經為 Pod 預留的節點上的計算資源。在一個插件中,reserve 擴展和 unreserve 擴展應該成對出現。
如果我們要實現自己的插件,必須向調度框架注冊插件并完成配置,另外還必須實現擴展點接口,對應的擴展點接口我們可以在源碼 pkg/scheduler/framework/interface.go 文件中找到,如下所示:
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}
// PreEnqueuePlugin is an interface that must be implemented by "PreEnqueue" plugins.
// These plugins are called prior to adding Pods to activeQ.
// Note: an preEnqueue plugin is expected to be lightweight and efficient, so it's not expected to
// involve expensive calls like accessing external endpoints; otherwise it'd block other
// Pods' enqueuing in event handlers.
type PreEnqueuePlugin interface {
Plugin
// PreEnqueue is called prior to adding Pods to activeQ.
PreEnqueue(ctx context.Context, p *v1.Pod) *Status
}
// LessFunc is the function to sort pod info
type LessFunc func(podInfo1, podInfo2 *QueuedPodInfo) bool
// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
// plugin may be enabled at a time.
type QueueSortPlugin interface {
Plugin
// Less are used to sort pods in the scheduling queue.
Less(*QueuedPodInfo, *QueuedPodInfo) bool
}
// EnqueueExtensions is an optional interface that plugins can implement to efficiently
// move unschedulable Pods in internal scheduling queues. Plugins
// that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface.
type EnqueueExtensions interface {
// EventsToRegister returns a series of possible events that may cause a Pod
// failed by this plugin schedulable.
// The events will be registered when instantiating the internal scheduling queue,
// and leveraged to build event handlers dynamically.
// Note: the returned list needs to be static (not depend on configuration parameters);
// otherwise it would lead to undefined behavior.
EventsToRegister() []ClusterEvent
}
// PreFilterExtensions is an interface that is included in plugins that allow specifying
// callbacks to make incremental updates to its supposedly pre-calculated
// state.
type PreFilterExtensions interface {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
// RemovePod is called by the framework while trying to evaluate the impact
// of removing podToRemove from the node while scheduling podToSchedule.
RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}
// PreFilterPlugin is an interface that must be implemented by "PreFilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PreFilterPlugin interface {
Plugin
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected. PreFilter could optionally
// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
// modify its pre-processed info. The framework guarantees that the extensions
// AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
// CycleState, and may call those functions more than once before calling
// Filter again on a specific node.
PreFilterExtensions() PreFilterExtensions
}
// FilterPlugin is an interface for Filter plugins. These plugins are called at the
// filter extension point for filtering out hosts that cannot run a pod.
// This concept used to be called 'predicate' in the original scheduler.
// These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
// However, the scheduler accepts other valid codes as well.
// Anything other than "Success" will lead to exclusion of the given host from
// running the pod.
type FilterPlugin interface {
Plugin
// Filter is called by the scheduling framework.
// All FilterPlugins should return "Success" to declare that
// the given node fits the pod. If Filter doesn't return "Success",
// it will return "Unschedulable", "UnschedulableAndUnresolvable" or "Error".
// For the node being evaluated, Filter plugins should look at the passed
// nodeInfo reference for this particular node's information (e.g., pods
// considered to be running on the node) instead of looking it up in the
// NodeInfoSnapshot because we don't guarantee that they will be the same.
// For example, during preemption, we may pass a copy of the original
// nodeInfo object that has some pods removed from it to evaluate the
// possibility of preempting them to schedule the target pod.
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}
// PostFilterPlugin is an interface for "PostFilter" plugins. These plugins are called
// after a pod cannot be scheduled.
type PostFilterPlugin interface {
Plugin
// PostFilter is called by the scheduling framework.
// A PostFilter plugin should return one of the following statuses:
// - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
// - Success: the plugin gets executed successfully and the pod can be made schedulable.
// - Error: the plugin aborts due to some internal error.
//
// Informational plugins should be configured ahead of other ones, and always return Unschedulable status.
// Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
// a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
// preemptor pod's .spec.status.nominatedNodeName field.
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}
// PreScorePlugin is an interface for "PreScore" plugin. PreScore is an
// informational extension point. Plugins will be called with a list of nodes
// that passed the filtering phase. A plugin may use this data to update internal
// state or to generate logs/metrics.
type PreScorePlugin interface {
Plugin
// PreScore is called by the scheduling framework after a list of nodes
// passed the filtering phase. All prescore plugins must return success or
// the pod will be rejected
PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}
// ScoreExtensions is an interface for Score extended functionality.
type ScoreExtensions interface {
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
// method. A successful run of NormalizeScore will update the scores list and return
// a success status.
NormalizeScore(ctx context.Context, state *CycleState, p *v1.Pod, scores NodeScoreList) *Status
}
// ScorePlugin is an interface that must be implemented by "Score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
Plugin
// Score is called on each filtered node. It must return success and an integer
// indicating the rank of the node. All scoring plugins must return success or
// the pod will be rejected.
Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)
// ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.
ScoreExtensions() ScoreExtensions
}
// ReservePlugin is an interface for plugins with Reserve and Unreserve
// methods. These are meant to update the state of the plugin. This concept
// used to be called 'assume' in the original scheduler. These plugins should
// return only Success or Error in Status.code. However, the scheduler accepts
// other valid codes as well. Anything other than Success will lead to
// rejection of the pod.
type ReservePlugin interface {
Plugin
// Reserve is called by the scheduling framework when the scheduler cache is
// updated. If this method returns a failed Status, the scheduler will call
// the Unreserve method for all enabled ReservePlugins.
Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
// Unreserve is called by the scheduling framework when a reserved pod was
// rejected, an error occurred during reservation of subsequent plugins, or
// in a later phase. The Unreserve method implementation must be idempotent
// and may be called by the scheduler even if the corresponding Reserve
// method for the same plugin was not called.
Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
// PreBindPlugin is an interface that must be implemented by "PreBind" plugins.
// These plugins are called before a pod being scheduled.
type PreBindPlugin interface {
Plugin
// PreBind is called before binding a pod. All prebind plugins must return
// success or the pod will be rejected and won't be sent for binding.
PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
// PostBindPlugin is an interface that must be implemented by "PostBind" plugins.
// These plugins are called after a pod is successfully bound to a node.
type PostBindPlugin interface {
Plugin
// PostBind is called after a pod is successfully bound. These plugins are
// informational. A common application of this extension point is for cleaning
// up. If a plugin needs to clean-up its state after a pod is scheduled and
// bound, PostBind is the extension point that it should register.
PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
// PermitPlugin is an interface that must be implemented by "Permit" plugins.
// These plugins are called before a pod is bound to a node.
type PermitPlugin interface {
Plugin
// Permit is called before binding a pod (and before prebind plugins). Permit
// plugins are used to prevent or delay the binding of a Pod. A permit plugin
// must return success or wait with timeout duration, or the pod will be rejected.
// The pod will also be rejected if the wait timeout or the pod is rejected while
// waiting. Note that if the plugin returns "wait", the framework will wait only
// after running the remaining plugins given that no other plugin rejects the pod.
Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
// BindPlugin is an interface that must be implemented by "Bind" plugins. Bind
// plugins are used to bind a pod to a Node.
type BindPlugin interface {
Plugin
// Bind plugins will not be called until all pre-bind plugins have completed. Each
// bind plugin is called in the configured order. A bind plugin may choose whether
// or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the
// remaining bind plugins are skipped. When a bind plugin does not handle a pod,
// it must return Skip in its Status code. If a bind plugin returns an Error, the
// pod is rejected and will not be bound.
Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
對于調度框架插件的啟用或者禁用,我們可以使用安裝集群時的 KubeSchedulerConfiguration 資源對象來進行配置。下面的例子中的配置啟用了一個實現了 reserve 和 preBind 擴展點的插件,并且禁用了另外一個插件,同時為插件 foo 提供了一些配置信息:
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
---
plugins:
reserve:
enabled:
- name: foo
- name: bar
disabled:
- name: baz
preBind:
enabled:
- name: foo
disabled:
- name: baz
pluginConfig:
- name: foo
args: >
foo插件可以解析的任意內容
擴展的調用順序如下:
- 如果某個擴展點沒有配置對應的擴展,調度框架將使用默認插件中的擴展
- 如果為某個擴展點配置且激活了擴展,則調度框架將先調用默認插件的擴展,再調用配置中的擴展
- 默認插件的擴展始終被最先調用,然后按照 KubeSchedulerConfiguration 中擴展的激活 enabled 順序逐個調用擴展點的擴展
- 可以先禁用默認插件的擴展,然后在 enabled 列表中的某個位置激活默認插件的擴展,這種做法可以改變默認插件的擴展被調用時的順序
假設默認插件 foo 實現了 reserve 擴展點,此時我們要添加一個插件 bar,想要在 foo 之前被調用,則應該先禁用 foo 再按照 bar foo 的順序激活。示例配置如下所示:
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
---
profiles:
- plugins:
reserve:
enabled:
- name: bar
- name: foo
disabled:
- name: foo
在源碼目錄 pkg/scheduler/framework/plugins/examples 中有幾個示范插件,我們可以參照其實現方式。
示例
其實要實現一個調度框架的插件,并不難,我們只要實現對應的擴展點,然后將插件注冊到調度器中即可,下面是默認調度器在初始化的時候注冊的插件:
// pkg/scheduler/algorithmprovider/registry.go
func NewRegistry() Registry {
return Registry{
// FactoryMap:
// New plugins are registered here.
// example:
// {
// stateful_plugin.Name: stateful.NewStatefulMultipointExample,
// fooplugin.Name: fooplugin.New,
// }
}
}
但是可以看到默認并沒有注冊一些插件,所以要想讓調度器能夠識別我們的插件代碼,就需要自己來實現一個調度器了,當然這個調度器我們完全沒必要完全自己實現,直接調用默認的調度器,然后在上面的 NewRegistry() 函數中將我們的插件注冊進去即可。在 kube-scheduler 的源碼文件 kubernetes/cmd/kube-scheduler/app/server.go 中有一個 NewSchedulerCommand 入口函數,其中的參數是一個類型為 Option 的列表,而這個 Option 恰好就是一個插件配置的定義:
// Option configures a framework.Registry.
type Option func(framework.Registry) error
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
......
}
所以我們完全就可以直接調用這個函數來作為我們的函數入口,并且傳入我們自己實現的插件作為參數即可,而且該文件下面還有一個名為 WithPlugin 的函數可以來創建一個 Option 實例:
func WithPlugin(name string, factory runtime.PluginFactory) Option {
return func(registry runtime.Registry) error {
return registry.Register(name, factory)
}
}
所以最終我們的入口函數如下所示:
package main
import (
"k8s.io/component-base/cli"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"math/rand"
"os"
// Ensure scheme package is initialized.
_ "simple-scheduler/pkg/scheduler/apis/config/schema"
"simple-scheduler/pkg/scheduler/framework/plugins"
"time"
)
func main() {
rand.Seed(time.Now().UTC().UnixNano())
command := app.NewSchedulerCommand(
app.WithPlugin(plugins.Name, plugins.New))
code := cli.Run(command)
os.Exit(code)
}
其中 app.WithPlugin(sample.Name, sample.New) 就是我們接下來要實現的插件,從 WithPlugin 函數的參數也可以看出我們這里的 sample.New 必須是一個 framework.PluginFactory 類型的值,而 PluginFactory 的定義就是一個函數:
type PluginFactory = func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error)
所以 sample.New 實際上就是上面的這個函數,在這個函數中我們可以獲取到插件中的一些數據然后進行邏輯處理即可,插件實現如下所示,我們這里只是簡單獲取下數據打印日志,如果你有實際需求的可以根據獲取的數據就行處理即可,我們這里只是實現了 PreFilter、Filter、PreBind 三個擴展點,其他的可以用同樣的方式來擴展即可:
package plugins
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"simple-scheduler/pkg/scheduler/apis/config"
"simple-scheduler/pkg/scheduler/apis/config/validation"
)
const Name = "sample-plugin"
type Sample struct {
args *config.SampleArgs
handle framework.Handle
}
func (s *Sample) Name() string {
return Name
}
func (s *Sample) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
klog.V(3).Infof("prefilter pod: %v", pod.Name)
return nil, nil
}
func (s *Sample) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
klog.V(3).Infof("filter pod: %v, node: %v", pod.Name, nodeInfo.Node().Name)
return framework.NewStatus(framework.Success, "")
}
func (s *Sample) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
if nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName); err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("prebind get node: %s info error: %s", nodeName, err.Error()))
} else {
klog.V(3).Infof("prebind node info: %+v", nodeInfo.Node())
return framework.NewStatus(framework.Success, "")
}
}
func New(fpArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
args, ok := fpArgs.(*config.SampleArgs)
if !ok {
return nil, fmt.Errorf("got args of type %T, want *SampleArgs", fpArgs)
}
if err := validation.ValidateSamplePluginArgs(*args); err != nil {
return nil, err
}
return &Sample{
args: args,
handle: fh,
}, nil
}
完整代碼可以前往倉庫 https://Github.com/cnych/sample-scheduler-framework 獲取。
這里還定義了一個調度去插件的參數:
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type SampleArgs struct {
metav1.TypeMeta
FavoriteColor string `json:"favorite_color,omitempty"`
FavoriteNumber int `json:"favorite_number,omitempty"`
ThanksTo string `json:"thanks_to,omitempty"`
}
在舊版本中提供了 framework.DecodeInto 函數可以直接將我們傳遞進來的參數進行轉換,但是新版本必須是一個 runtime.Object 對象,所以必須實現對應的深拷貝方法,所以我們在結構體上面增加了 +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 這個注解,然后通過 Kubernetes 源碼中提供的 hack/update-gen.sh 腳本就可以自動生成對對應的深拷貝方法。
同意在文件 register.go 中,我們需要在對 AddKnownTypes 函數的調用中添加 SampleArgs。另外,請注意在 main.go 文件中我們導入了這里定義的 schema,它使用我們在 pkg/apis 中引入的所有配置初始化方案/配置文件。
實現完成后,編譯打包成鏡像即可,然后我們就可以當成普通的應用用一個 Deployment 控制器來部署即可,由于我們需要去獲取集群中的一些資源對象,所以當然需要申請 RBAC 權限,然后同樣通過 --config 參數來配置我們的調度器,同樣還是使用一個 KubeSchedulerConfiguration 資源對象配置,可以通過 plugins 來啟用或者禁用我們實現的插件,也可以通過 pluginConfig 來傳遞一些參數值給插件:
# sample-scheduler.yaml
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: sample-scheduler-clusterrole
rules:
- apiGroups:
- ""
resources:
- endpoints
- events
verbs:
- create
- get
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- watch
- update
- apiGroups:
- ""
resources:
- bindings
- pods/binding
verbs:
- create
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
- replicationcontrollers
- services
verbs:
- get
- list
- watch
- apiGroups:
- apps
- extensions
resources:
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- "storage.k8s.io"
resources:
- storageclasses
- csinodes
verbs:
- get
- list
- watch
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- create
- get
- list
- update
- apiGroups:
- "events.k8s.io"
resources:
- events
verbs:
- create
- patch
- update
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: sample-scheduler-sa
namespace: kube-system
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: sample-scheduler-clusterrolebinding
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: sample-scheduler-clusterrole
subjects:
- kind: ServiceAccount
name: sample-scheduler-sa
namespace: kube-system
---
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-config
namespace: kube-system
data:
scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: true
leaseDuration: 15s
renewDeadline: 10s
resourceLock: endpointsleases
resourceName: sample-scheduler
resourceNamespace: kube-system
retryPeriod: 2s
profiles:
- schedulerName: sample-scheduler
plugins:
preFilter:
enabled:
- name: "sample-plugin"
filter:
enabled:
- name: "sample-plugin"
pluginConfig:
- name: sample-plugin
args: # runtime.Object
favorColor: "#326CE5"
favorNumber: 7
thanksTo: "Kubernetes"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sample-scheduler
namespace: kube-system
labels:
component: sample-scheduler
spec:
selector:
matchLabels:
component: sample-scheduler
template:
metadata:
labels:
component: sample-scheduler
spec:
serviceAccountName: sample-scheduler-sa
priorityClassName: system-cluster-critical
volumes:
- name: scheduler-config
configMap:
name: scheduler-config
containers:
- name: scheduler
image: cnych/sample-scheduler:v0.26.4
imagePullPolicy: IfNotPresent
command:
- sample-scheduler
- --cnotallow=/etc/kubernetes/scheduler-config.yaml
- --v=3
volumeMounts:
- name: scheduler-config
mountPath: /etc/kubernetes
直接部署上面的資源對象即可,這樣我們就部署了一個名為 sample-scheduler 的調度器了,接下來我們可以部署一個應用來使用這個調度器進行調度:
# test-scheduler.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-scheduler
spec:
selector:
matchLabels:
app: test-scheduler
template:
metadata:
labels:
app: test-scheduler
spec:
schedulerName: sample-scheduler # 指定使用的調度器,不指定使用默認的default-scheduler
containers:
- image: Nginx:1.7.9
imagePullPolicy: IfNotPresent
name: nginx
ports:
- containerPort: 80
這里需要注意的是我們現在手動指定了一個 schedulerName 的字段,將其設置成上面我們自定義的調度器名稱 sample-scheduler。
我們直接創建這個資源對象,創建完成后查看我們自定義調度器的日志信息:
? kubectl get pods -n kube-system -l compnotallow=sample-scheduler
NAME READY STATUS RESTARTS AGE
sample-scheduler-896658cd7-k7vcl 1/1 Running 0 57s
? kubectl logs -f sample-scheduler-896658cd7-k7vcl -n kube-system
I0114 09:14:18.878613 1 eventhandlers.go:173] add event for unscheduled pod default/test-scheduler-6486fd49fc-zjhcx
I0114 09:14:18.878670 1 scheduler.go:464] Attempting to schedule pod: default/test-scheduler-6486fd49fc-zjhcx
I0114 09:14:18.878706 1 sample.go:77] "Start PreFilter Pod" pod="test-scheduler-6486fd49fc-zjhcx"
I0114 09:14:18.878802 1 sample.go:93] "Start Filter Pod" pod="test-scheduler-6486fd49fc-zjhcx" node="node2" preFilterState=&{Resource:{MilliCPU:0 Memory:0 EphemeralStorage:0 AllowedPodNumber:0 ScalarResources:map[]}}
I0114 09:14:18.878835 1 sample.go:93] "Start Filter Pod" pod="test-scheduler-6486fd49fc-zjhcx" node="node1" preFilterState=&{Resource:{MilliCPU:0 Memory:0 EphemeralStorage:0 AllowedPodNumber:0 ScalarResources:map[]}}
I0114 09:14:18.879043 1 default_binder.go:51] Attempting to bind default/test-scheduler-6486fd49fc-zjhcx to node1
I0114 09:14:18.886360 1 scheduler.go:609] "Successfully bound pod to node" pod="default/test-scheduler-6486fd49fc-zjhcx" node="node1" evaluatedNodes=3 feasibleNodes=2
I0114 09:14:18.887426 1 eventhandlers.go:205] delete event for unscheduled pod default/test-scheduler-6486fd49fc-zjhcx
I0114 09:14:18.887475 1 eventhandlers.go:225] add event for scheduled pod default/test-scheduler-6486fd49fc-zjhcx
可以看到當我們創建完 Pod 后,在我們自定義的調度器中就出現了對應的日志,并且在我們定義的擴展點上面都出現了對應的日志,證明我們的示例成功了,也可以通過查看 Pod 的 schedulerName 來驗證:
? kubectl get pods
NAME READY STATUS RESTARTS AGE
test-scheduler-6486fd49fc-zjhcx 1/1 Running 0 35s
? kubectl get pod test-scheduler-6486fd49fc-zjhcx -o yaml
......
restartPolicy: Always
schedulerName: sample-scheduler
securityContext: {}
serviceAccount: default
......
從 Kubernetes v1.17 版本開始,Scheduler Framework 內置的預選和優選函數已經全部插件化,所以要擴展調度器我們應該掌握并理解調度框架這種方式。