kube-scheduler源码学习与定制-资源维护篇
背景
项目中使用了内核的Isolcpus功能隔离CPU核心,然后通过扩展调度器和修改kubelet,实现了特定pod对隔离CPU的独享绑定使用。
但是最近发现一个设计之初没考虑到的大问题:
我们使用注解来标记pod对隔离cpu的需求,但是数量还是沿用了k8s默认的QOS体系。即扩展调度器通过注解确定pod是不是要使用隔离核心,然后通过pod的request字段来确定需要几个隔离核。
但是由于这类pod不再使用普通的cpu,但是却写了cpu request,就会导致调度器在维护资源状态时,重复扣除了两次cpu:一次是扩展的隔离核cpu资源,一次是非隔离的普通cpu。最终会导致node上的普通cpu核心被浪费。
解决思路
为了解决这个问题,有以下几个思路可以考虑
- 方案1:按照标准化方式,把隔离核转成扩展资源。该方案类似于k8s中对gpu资源的管理。
- 弊端1:使用这种方案后,cpu request就不能再写了,使用隔离核的pod就会变成最低优先级的Besteffort类pod,可能会影响pod的抢占、基于qos的驱逐等策略。
- 弊端2:需要推翻现有方案,重新实现,从项目管理上性价比比较低
- 方案2:基于现有方案,把所有隔离核信息全部转移到注解里,不再依赖cpu request获取隔离核需求数量。
- 弊端:同上。这个方案改造成本较小。
- 方案3:基于现有方案,一条路走到黑,修改默认调度器中对node资源状态管理的代码,使得使用了隔离核的pod的cpu request不会被重复扣减。
- 该方案优点:不需要推翻现有方案,性价比高
- 弊端:扩展调度器引用的原始的k8s scheduler核心代码需要改为引用修改过的scheduler代码,以后版本升级可能会留下坑
- 综合考虑,准备采用方案3进行尝试。
k8s调度器资源管理代码分析
启动时的资源扣减
和很多控制器一样,调度器启动时也使用client-go的eventHandler模式,为pod注册新增、修改、删除事件处理函数。
// pkg/scheduler/eventhandlers.go:358
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
) {
...
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache,
UpdateFunc: sched.updatePodInCache,
DeleteFunc: sched.deletePodFromCache,
},
...
}
跟踪sched.addPodToCache函数,可以看到如下代码
// pkg/scheduler/eventhandlers.go:215
func (sched *Scheduler) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
return
}
klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulerCache.AddPod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
}
sched.SchedulingQueue.AssignedPodAdded(pod)
}
和资源管理相关的就是调用sched.SchedulerCache.AddPod(pod)
,将pod加到cache里。所以我们继续看看cache.AddPod函数的实现。
- 判断podState中是否已经存在pod,如果存在了,且assumePod的信息里也有该pod,则更新pod最新的信息。至于assumePod是做什么的,后面看调度过程会看到。
- 如果是全新的pod,直接加入cache即可
// pkg/scheduler/internal/cache/cache.go:471
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods.Has(key):
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
if err = cache.removePod(currState.pod); err != nil {
klog.Errorf("removing pod error: %v", err)
}
cache.addPod(pod)
}
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
上面函数其实是对cache.addPod()的一层包装,还没有触及pod资源管理的灵魂,所以我们继续看cache.addPod()的实现
// pkg/scheduler/internal/cache/cache.go:431
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
n.info.AddPod(pod)
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
在这里我们终于看到pod和node产生了关联,也就意味着pod的资源消耗可能要被记到node上了。上面函数从cache的node中,找到pod所在的node,然后调用n.info.AddPod(pod)进行了注册。
所以接下来我们看n.info.AddPod(pod)的实现
// pkg/scheduler/framework/types.go:679
func (n *NodeInfo) AddPod(pod *v1.Pod) {
n.AddPodInfo(NewPodInfo(pod))
}
//pkg/scheduler/framework/types.go:651
func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
res, non0CPU, non0Mem := calculateResource(podInfo.Pod)
n.Requested.MilliCPU += res.MilliCPU
n.Requested.Memory += res.Memory
n.Requested.EphemeralStorage += res.EphemeralStorage
if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
n.Requested.ScalarResources = map[v1.ResourceName]int64{}
}
for rName, rQuant := range res.ScalarResources {
n.Requested.ScalarResources[rName] += rQuant
}
n.NonZeroRequested.MilliCPU += non0CPU
n.NonZeroRequested.Memory += non0Mem
n.Pods = append(n.Pods, podInfo)
if podWithAffinity(podInfo.Pod) {
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
}
if podWithRequiredAntiAffinity(podInfo.Pod) {
n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
}
// Consume ports when pods added.
n.updateUsedPorts(podInfo.Pod, true)
n.Generation = nextGeneration()
}
在上面的func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {}
中,我们终于看到了pod的资源是如何被注册到node里的。
调度结果确定后的资源扣减
除了第一次启动时调度器需要把所有已分配的资源从node上扣除,在每次有新pod完成调度时,也应该将其资源从node上扣除。
所以我们需要跟踪代码,看看这个阶段的扣除是如何完成的。
调度框架的核心流程是一个叫scheduleOne
的函数:
// pkg/scheduler/scheduler.go:441
func (sched *Scheduler) scheduleOne(ctx context.Context) {
...
// 这里我们可以看到调度的结果已经出现了,即调度阶段已经结束,bind阶段还未开始
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
...
// 后面紧接着可以看到调用了一个叫assume的函数
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
}
这个夹在schedule和bind阶段中间的assume函数,主要作用就是将单线程运行的调度阶段得出的pod调度初步结果进行保存,完成资源的扣减,这样后续的bind阶段就可以并发进行,不会发生资源重复分配的问题。
看assume函数的实现,可以看到熟悉的身影,和启动时类似,仍然是调用了cache包的方法,只不过名字叫AssumePod
// pkg/scheduler/scheduler.go:373
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "scheduler cache AssumePod failed")
return err
}
...
}
继续看AssumePod函数的实现,终于我们又看到了和启动时一样的函数调用,即cache.addPod()函数。
// pkg/scheduler/internal/cache/cache.go:356
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods.Insert(key)
return nil
}
所以从以上代码的逻辑可以看到,每个pod调度阶段完成后,就会将其加入cache中,对应的node上的资源就会进行扣减。
方案3实现
经过上面代码的阅读,我们已经发现了调度器的cache是如何对pod的资源在对应的node做资源扣减的。因此只需要在上面的func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {}
中,对使用了隔离核的pod注解进行区别对待,不做cpu扣减即可。
但是还有一个需要注意的事项是,既然有资源的扣减,那么pod被删除时肯定还得有资源的归还。我们推测大概率还是在NodeInfo这个结构体的方法上实现的。查看其所有方法,可以看到func (n *NodeInfo) RemovePod(pod *v1.Pod) error {}
方法,里面实现了对资源的归还。所以也需要同时修改该方法。
参考
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。