queueSort扩展点

概述

该扩展点需要完成的工作为:对两个pod的调度优先级进行比较

该扩展点有且只有一个插件实现,默认的实现插件为下面的PrioritySort插件。

PrioritySort

pkg/scheduler/framework/plugins/queuesort/priority_sort.go

// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodQueueInfo.timestamp.
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
    p1 := corev1helpers.PodPriority(pInfo1.Pod)
    p2 := corev1helpers.PodPriority(pInfo2.Pod)
    return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}
  • 核心逻辑:通过Less方法比较两个pod的调度优先级,如果pod1的优先级比pod2优先级高,则返回true

  • 比较的依据是先比较podSpec里的priority,priority越高则优先级越高;如果priority相等,则时间早的排序靠前

  • podSpec里的priority字段在pod到达调度器之前由pod关联的PriorityClass对象解析而来

  • 一个调度器框架中,只有一个queueSort plugin可以被启用,代码中默认启用了第一个

装载过程

pkg/scheduler/factory.go

Configurator.create()

image-20210527180223057

数据的流动:config->profile->framework.Framework->QueueSortFunc()->framework.LessFunc

preFilter扩展点

概述

对于每一个调度框架下的plugin,处理该扩展点的大致套路如下:

  • 关键输入:cycleState,pod
    • cycleState为各个plugin的共用存储,其包装了一个map[string]StateData,可以通过Read、Write方法安全地读写
    • pod即为v1 api组的Pod结构体,可以从中取得pod的信息
  • 输出:正常情况返回nil,有错误的情况,通过framework.NewStatus()返回类似于Error、Unschedulable等状态
  • 核心操作:每个插件定义一个独有的key,以及一个独有的StateData,按照需求将需要后续使用的数据封装到StateData中,写入到cycleState这个map中

以下为不同插件对该扩展点的具体实现

NodeResourcesFit

pkg/scheduler/framework/plugins/noderesources/fit.go

func computePodResourceRequest(pod *v1.Pod) *preFilterState {
    result := &preFilterState{}
    for _, container := range pod.Spec.Containers {
        result.Add(container.Resources.Requests)
    }

    // take max_resource(sum_pod, any_init_container)
    for _, container := range pod.Spec.InitContainers {
        result.SetMaxResource(container.Resources.Requests)
    }

    // If Overhead is being utilized, add to the total requests for the pod
    if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
        result.Add(pod.Spec.Overhead)
    }

    return result
}
  • 核心逻辑:对于一个进入调度过程的pod,计算其整体的资源需求量,为后续流程做准备

  • 由于init container顺序启动,所以对于它们的同种资源需求取最大值;普通container的同种资源需求累加

  • 除了统计一个pod内所有容器的资源需求外,调度器还支持将pod本身除容器以外的额外资源开销纳入调度流程中,详情见Pod开销 。因此函数的最后会检查该特性门控是否开启,pod是否有overhead字段,如果满足条件,会把这类资源消耗也纳入统计中

NodePorts

pkg/scheduler/framework/plugins/nodeports/node_ports.go

// getContainerPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
// will be in the result; but it does not resolve port conflict.
func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
    ports := []*v1.ContainerPort{}
    for _, pod := range pods {
        for j := range pod.Spec.Containers {
            container := &pod.Spec.Containers[j]
            for k := range container.Ports {
                ports = append(ports, &container.Ports[k])
            }
        }
    }
    return ports
}
  • 核心逻辑:遍历pod的所有容器的所有端口,将其展平后放入一个slice中

PodTopologySpread

pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

该扩展点主要处理pod的拓扑域打散信息

  • 主要逻辑:
    • 首先对传入的pod,如果其podSpec里有topologySpreadConstraints字段,则会对pod的该字段进行过滤,只保留whenUnsatisfiable字段为DoNotSchedule的topologySpreadConstraints;如果podSpec没有topologySpreadConstraints字段,会基于该插件自带的defaultConstraints字段为pod构建topologySpreadConstraint。
    • 除了向preFilterState存入上一步处理后的topologySpreadConstraints切片,插件还需要考虑pod的NodeAffinity和NodeSelector字段,在拓扑域打散的过程中还需要满足pod对节点偏好的需求,因此该插件的preFilterState还包含了TpKeyToCriticalPaths、TpPairToMatchNum两个属性,此处具体细节待有需求了进一步研究。

InterPodAffinity

pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

该扩展点主要处理pod的affinity和antifinity信息

  • 1.21新增了按照命名空间确定pod范围的相关功能,处于alpha阶段
  • 除新增功能需要的字段外,该插件的preFilterState主要包含了以下几个字段:
    • existingAntiAffinityCounts:遍历所有node上的所有pod,检查是否有PodAntiAffinity字段,如果有,且与待调度pod的AffinityTerm在拓扑域上匹配,则计数加1
    • affinityCounts和antiAffinityCounts:筛选所有节点已有pod中,与待调度pod的PodAffinity和PodAntiAffinity拓扑域和selector匹配的数量
    • 以上三个字段均为map,key为拓扑域,value为拓扑域下pod的计数值。
  • 该插件在prefilter阶段主要是从整个集群范围内筛选出与待调度pod在PodAffinity和PodAntiAffinity有关联的相关pod数量,为后面过滤阶段做准备,使得过滤阶段不再需要遍历集群所有的pod。

VolumeBinding

pkg/scheduler/framework/plugins/volumebinding/volume_binding.go

  • 核心逻辑:遍历podSpec的volume,判断是否用到了pvc,如果没有用到,则该扩展点没有意义,跳过;如果有pvc,则解析pvc的信息

  • 解析出的PVC状态分为三类:bound, tobind, unboundImmediate,其中的unboundImmediate状态属于非正常状态,如果存在这类pvc,会直接返回错误信息pod has unbound immediate PersistentVolumeClaims,并将pod重新放回调度队列

  • 最后会初始化一个map:podVolumesByNode,在filter扩展点阶段会用到

NodeAffinity

pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go

func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {
    var selector labels.Selector
    if len(pod.Spec.NodeSelector) > 0 {
        selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
    }
    // Use LazyErrorNodeSelector for backwards compatibility of parsing errors.
    var affinity *LazyErrorNodeSelector
    if pod.Spec.Affinity != nil &&
        pod.Spec.Affinity.NodeAffinity != nil &&
        pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
        affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
    }
    return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
}
  • 核心逻辑:解析podSpec里的nodeSelector字段和nodeAffinity字段,保存
  • 此处只处理了RequiredDuringSchedulingIgnoredDuringExecution类型的nodeAffinity
文章目录