背景

项目中使用了内核的Isolcpus功能隔离CPU核心,然后通过扩展调度器和修改kubelet,实现了特定pod对隔离CPU的独享绑定使用。

但是最近发现一个设计之初没考虑到的大问题:

我们使用注解来标记pod对隔离cpu的需求,但是数量还是沿用了k8s默认的QOS体系。即扩展调度器通过注解确定pod是不是要使用隔离核心,然后通过pod的request字段来确定需要几个隔离核。

但是由于这类pod不再使用普通的cpu,但是却写了cpu request,就会导致调度器在维护资源状态时,重复扣除了两次cpu:一次是扩展的隔离核cpu资源,一次是非隔离的普通cpu。最终会导致node上的普通cpu核心被浪费。

解决思路

为了解决这个问题,有以下几个思路可以考虑

  1. 方案1:按照标准化方式,把隔离核转成扩展资源。该方案类似于k8s中对gpu资源的管理。
    • 弊端1:使用这种方案后,cpu request就不能再写了,使用隔离核的pod就会变成最低优先级的Besteffort类pod,可能会影响pod的抢占、基于qos的驱逐等策略。
    • 弊端2:需要推翻现有方案,重新实现,从项目管理上性价比比较低
  2. 方案2:基于现有方案,把所有隔离核信息全部转移到注解里,不再依赖cpu request获取隔离核需求数量。
    • 弊端:同上。这个方案改造成本较小。
  3. 方案3:基于现有方案,一条路走到黑,修改默认调度器中对node资源状态管理的代码,使得使用了隔离核的pod的cpu request不会被重复扣减。
    • 该方案优点:不需要推翻现有方案,性价比高
    • 弊端:扩展调度器引用的原始的k8s scheduler核心代码需要改为引用修改过的scheduler代码,以后版本升级可能会留下坑
  4. 综合考虑,准备采用方案3进行尝试。

k8s调度器资源管理代码分析

启动时的资源扣减

和很多控制器一样,调度器启动时也使用client-go的eventHandler模式,为pod注册新增、修改、删除事件处理函数。

// pkg/scheduler/eventhandlers.go:358
func addAllEventHandlers(
    sched *Scheduler,
    informerFactory informers.SharedInformerFactory,
) {
  ...
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToCache,
                UpdateFunc: sched.updatePodInCache,
                DeleteFunc: sched.deletePodFromCache,
            },
  ...
}

跟踪sched.addPodToCache函数,可以看到如下代码

// pkg/scheduler/eventhandlers.go:215
func (sched *Scheduler) addPodToCache(obj interface{}) {
    pod, ok := obj.(*v1.Pod)
    if !ok {
        klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
        return
    }
    klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))

    if err := sched.SchedulerCache.AddPod(pod); err != nil {
        klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
    }

    sched.SchedulingQueue.AssignedPodAdded(pod)
}

和资源管理相关的就是调用sched.SchedulerCache.AddPod(pod),将pod加到cache里。所以我们继续看看cache.AddPod函数的实现。

  1. 判断podState中是否已经存在pod,如果存在了,且assumePod的信息里也有该pod,则更新pod最新的信息。至于assumePod是做什么的,后面看调度过程会看到。
  2. 如果是全新的pod,直接加入cache即可
// pkg/scheduler/internal/cache/cache.go:471
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()

    currState, ok := cache.podStates[key]
    switch {
    case ok && cache.assumedPods.Has(key):
        if currState.pod.Spec.NodeName != pod.Spec.NodeName {
            // The pod was added to a different node than it was assumed to.
            klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
            // Clean this up.
            if err = cache.removePod(currState.pod); err != nil {
                klog.Errorf("removing pod error: %v", err)
            }
            cache.addPod(pod)
        }
        delete(cache.assumedPods, key)
        cache.podStates[key].deadline = nil
        cache.podStates[key].pod = pod
    case !ok:
        // Pod was expired. We should add it back.
        cache.addPod(pod)
        ps := &podState{
            pod: pod,
        }
        cache.podStates[key] = ps
    default:
        return fmt.Errorf("pod %v was already in added state", key)
    }
    return nil
}

上面函数其实是对cache.addPod()的一层包装,还没有触及pod资源管理的灵魂,所以我们继续看cache.addPod()的实现

// pkg/scheduler/internal/cache/cache.go:431
func (cache *schedulerCache) addPod(pod *v1.Pod) {
    n, ok := cache.nodes[pod.Spec.NodeName]
    if !ok {
        n = newNodeInfoListItem(framework.NewNodeInfo())
        cache.nodes[pod.Spec.NodeName] = n
    }
    n.info.AddPod(pod)
    cache.moveNodeInfoToHead(pod.Spec.NodeName)
}

在这里我们终于看到pod和node产生了关联,也就意味着pod的资源消耗可能要被记到node上了。上面函数从cache的node中,找到pod所在的node,然后调用n.info.AddPod(pod)进行了注册。

所以接下来我们看n.info.AddPod(pod)的实现

// pkg/scheduler/framework/types.go:679
func (n *NodeInfo) AddPod(pod *v1.Pod) {
    n.AddPodInfo(NewPodInfo(pod))
}

//pkg/scheduler/framework/types.go:651
func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
    res, non0CPU, non0Mem := calculateResource(podInfo.Pod)
    n.Requested.MilliCPU += res.MilliCPU
    n.Requested.Memory += res.Memory
    n.Requested.EphemeralStorage += res.EphemeralStorage
    if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
        n.Requested.ScalarResources = map[v1.ResourceName]int64{}
    }
    for rName, rQuant := range res.ScalarResources {
        n.Requested.ScalarResources[rName] += rQuant
    }
    n.NonZeroRequested.MilliCPU += non0CPU
    n.NonZeroRequested.Memory += non0Mem
    n.Pods = append(n.Pods, podInfo)
    if podWithAffinity(podInfo.Pod) {
        n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
    }
    if podWithRequiredAntiAffinity(podInfo.Pod) {
        n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
    }

    // Consume ports when pods added.
    n.updateUsedPorts(podInfo.Pod, true)

    n.Generation = nextGeneration()
}

在上面的func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {}中,我们终于看到了pod的资源是如何被注册到node里的。

调度结果确定后的资源扣减

除了第一次启动时调度器需要把所有已分配的资源从node上扣除,在每次有新pod完成调度时,也应该将其资源从node上扣除。

所以我们需要跟踪代码,看看这个阶段的扣除是如何完成的。

调度框架的核心流程是一个叫scheduleOne的函数:

// pkg/scheduler/scheduler.go:441
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  ...
  // 这里我们可以看到调度的结果已经出现了,即调度阶段已经结束,bind阶段还未开始
  scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
  ...
  // 后面紧接着可以看到调用了一个叫assume的函数
  err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
}

这个夹在schedule和bind阶段中间的assume函数,主要作用就是将单线程运行的调度阶段得出的pod调度初步结果进行保存,完成资源的扣减,这样后续的bind阶段就可以并发进行,不会发生资源重复分配的问题。

看assume函数的实现,可以看到熟悉的身影,和启动时类似,仍然是调用了cache包的方法,只不过名字叫AssumePod

// pkg/scheduler/scheduler.go:373
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // Optimistically assume that the binding will succeed and send it to apiserver
    // in the background.
    // If the binding fails, scheduler will release resources allocated to assumed pod
    // immediately.
    assumed.Spec.NodeName = host

    if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
        klog.ErrorS(err, "scheduler cache AssumePod failed")
        return err
    }
    ...
}

继续看AssumePod函数的实现,终于我们又看到了和启动时一样的函数调用,即cache.addPod()函数。

// pkg/scheduler/internal/cache/cache.go:356
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()
    if _, ok := cache.podStates[key]; ok {
        return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
    }

    cache.addPod(pod)
    ps := &podState{
        pod: pod,
    }
    cache.podStates[key] = ps
    cache.assumedPods.Insert(key)
    return nil
}

所以从以上代码的逻辑可以看到,每个pod调度阶段完成后,就会将其加入cache中,对应的node上的资源就会进行扣减。

方案3实现

经过上面代码的阅读,我们已经发现了调度器的cache是如何对pod的资源在对应的node做资源扣减的。因此只需要在上面的func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {}中,对使用了隔离核的pod注解进行区别对待,不做cpu扣减即可。

但是还有一个需要注意的事项是,既然有资源的扣减,那么pod被删除时肯定还得有资源的归还。我们推测大概率还是在NodeInfo这个结构体的方法上实现的。查看其所有方法,可以看到func (n *NodeInfo) RemovePod(pod *v1.Pod) error {}方法,里面实现了对资源的归还。所以也需要同时修改该方法。

参考

文章目录