目标

  1. 划分系统的和用户的CPU池:操作系统、二进制、集群内的平台自有组件使用普通CPU池里的核心。用户从平台部署的业务容器使用另外一个单独的CPU池里的核心。二者强隔离
  2. 用户业务CPU全部绑定核心,基于cpuset特性实现
  3. 绑定核心时尽量单numa

当前流程

流程分析以k8s 1.21版本为例

  1. cpu只有一个单独的池子,无隔离池
  2. k8s的调度器感知不到node的numa拓扑结构,调度时不会参考到node的numa
  3. 然而kubelet可以感知到numa,可以根据配置,在pod调度到node上以后,完成不同的操作:判定绑定/不绑定cpu、pod是否能在node上运行等。后续代码分析将重点分析这些逻辑的实现

kubelet topology manager核心流程代码分析

构造topology manager

kubelet的一个核心功能就是对接node上的容器运行时,ContainerManager就是kubelet中主要负责管理容器的接口,该接口的定义位于pkg/kubelet/cm/container_manager.go:46,根据node的操作系统,有不同的结构体实现这个接口。我们主要关心linux操作系统下的实现:

// pkg/kubelet/cm/container_manager_linux.go:214
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
  ...
  if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
        cm.topologyManager, err = topologymanager.NewManager(
            machineInfo.Topology,
            nodeConfig.ExperimentalTopologyManagerPolicy,
            nodeConfig.ExperimentalTopologyManagerScope,
        )

        if err != nil {
            return nil, err
        }

        klog.InfoS("Initializing Topology Manager", "policy", nodeConfig.ExperimentalTopologyManagerPolicy, "scope", nodeConfig.ExperimentalTopologyManagerScope)
    } else {
        cm.topologyManager = topologymanager.NewFakeManager()
    }
  ...
}

kubelet启动后,就会调用NewContainerManager方法,构造一个container manager。container manager中我们主要关注topology manager。上述代码根据featureGate的开启与否,为container manager组装相应的topology manager。接下来关注topology manager的构造函数。

// pkg/kubelet/cm/topologymanager/topology_manager.go:119
func NewManager(topology []cadvisorapi.Node, topologyPolicyName string, topologyScopeName string) (Manager, error) {
  ...
  var policy Policy
    switch topologyPolicyName {

    case PolicyNone:
        policy = NewNonePolicy()

    case PolicyBestEffort:
        policy = NewBestEffortPolicy(numaNodes)

    case PolicyRestricted:
        policy = NewRestrictedPolicy(numaNodes)

    case PolicySingleNumaNode:
        policy = NewSingleNumaNodePolicy(numaNodes)

  var scope Scope
    switch topologyScopeName {

    case containerTopologyScope:
        scope = NewContainerScope(policy)

    case podTopologyScope:
        scope = NewPodScope(policy)

    default:
        return nil, fmt.Errorf("unknown scope: \"%s\"", topologyScopeName)
    }

    manager := &manager{
        scope: scope,
    }
  ...
}

该构造函数在组装topology manager结构体时,主要逻辑如下:

  1. 从入参topology中,拿到所有可用的numa节点
  2. 根据参数,用1中的cpu核心实例化相应的policy(policy的具体含义可以看参考博客或文档)
  3. 根据参数,用2中的policy实例化相应的scope(scope主要决定计算资源量时以容器为单位还是以pod为单位)
  4. 最后用3中的policy实例化topology manager

这个实例化的过程,一方面是典型的链式结构:numa nodes -> policy -> scope -> manager;另一方面使用了策略模式,普遍面向接口实现,方便根据配置的不同组装不同的组件,方便扩展和维护。

AddHintProvider

topology manger实例化好了以后,我们需要围绕其可能的操作,进一步了解其逻辑。观察topology manager接口的细节:

// pkg/kubelet/cm/topologymanager/topology_manager.go:42
type Manager interface {
    // PodAdmitHandler is implemented by Manager
    lifecycle.PodAdmitHandler
    // AddHintProvider adds a hint provider to manager to indicate the hint provider
    // wants to be consulted with when making topology hints
    AddHintProvider(HintProvider)
    // AddContainer adds pod to Manager for tracking
    AddContainer(pod *v1.Pod, container *v1.Container, containerID string)
    // RemoveContainer removes pod from Manager tracking
    RemoveContainer(containerID string) error
    // Store is the interface for storing pod topology hints
    Store
}

查看AddHintProvider方法相关的代码,可以看到在构造完topology manager后,会根据配置情况,为topology manager添加各种子manager,比如cpu、内存和其他硬件的manager,这些子manager都被定义为HintProvider,即可以为拓扑管理提供具体细节的结构体。topology manager会根据所以hint provider对pod提供的拓扑建议,进行汇总,从而得出pod在node上的numa绑定和运行结果。

image-20230130162722486

container缓存

继续看topology manager接口的AddContainer和RemoveContainer方法,这两个方法从字面意义比较容易理解,即添加和删除容器,其实就是缓存的维护。实际操作由topology manager里包裹的scope完成,缓存为scope结构体的一个map字段。

func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
    m.scope.AddContainer(pod, container, containerID)
}

func (m *manager) RemoveContainer(containerID string) error {
    return m.scope.RemoveContainer(containerID)
}

// 
func (s *scope) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    s.podMap.Add(string(pod.UID), container.Name, containerID)
}
...

在看这块代码时,可以发现实现时的一个小技巧。Scope接口有6个方法,containerScope和podScope作为接口的实现者,这6个方法的大多数都是一样的,仅有少数方法有不同的逻辑,并且两个实现者的字段完全一样。

所以这里的实现方法是建立一个公共的struct scope,它包含了所有实现者的公共字段,并实现了公共方法,containerScope和podScope通过组合scope结构体来实现继承,然后仅需要各自实现Admit方法即可,最大程度减少了代码的重复。

Admit

接下来来到topology manager发挥核心作用的时候了。当一个pod被kube-scheduler调度到节点上后,kubelet开始接管该pod的生命周期管理,此时会有一系列的组件对该pod做准入判定,topology manager就是这些准入判定组件之一。

// pkg/kubelet/cm/topologymanager/topology_manager.go:186
func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
    klog.InfoS("Topology Admit Handler")
    pod := attrs.Pod

    return m.scope.Admit(pod)
}

从上面代码可以看到Admit操作被移交给了topology manager持有的scope处理,而这也是scope在策略模式下主要的不同实现产生区别的地方。

从Admit的输入输出流来看,Admit方法的输入为一个pod,输出要求为一个bool值,判定pod的资源需求是否满足node的numa拓扑。

因此对于containerScope,其对于一个pod的判定逻辑是遍历每个容器,逐个容器判定资源需求是否满足numa拓扑,有任何一个不满足的就会判定pod不符合numa拓扑要求;而对于podScope,其对于一个pod的判定逻辑是将所有容器的资源需求累加,再统一判定需求是否满足numa拓扑。

podScopeAdmit

以pod范围的Admit逻辑为例,对每个pod,会获取该pod在该node上的provider hints(对于provider hints相关的数据结构,可以参考链接里的介绍,本文暂不对具体的数据结构和算法做介绍),然后依据不同的policy,对所有hints进行合并,获取最优的hint,最优的hint就是pod在node上的numa分配方式。得到最优的分配方式后,还需要实际调用各个hint provider的方法,通知其实际动手分配资源,完成状态的维护。

// pkg/kubelet/cm/topologymanager/scope_pod.go:45
func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
  ...
  // 计算最优分配方式
    bestHint, admit := s.calculateAffinity(pod)
  ...
    for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
        klog.InfoS("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
    // 存储分配方式
        s.setTopologyHints(string(pod.UID), container.Name, bestHint)

    // 实际分配资源
        err := s.allocateAlignedResources(pod, &container)
        if err != nil {
            return unexpectedAdmissionError(err)
        }
    }
    return admitPod()
}

具体看如何计算最优分配方式:

// pkg/kubelet/cm/topologymanager/scope_pod.go:81
func (s *podScope) calculateAffinity(pod *v1.Pod) (TopologyHint, bool) {
    providersHints := s.accumulateProvidersHints(pod)
    bestHint, admit := s.policy.Merge(providersHints)
    klog.InfoS("PodTopologyHint", "bestHint", bestHint)
    return bestHint, admit
}

上面的方法里调用了accumulateProvidersHints方法来收集pod对于所有hint provider的numa拓扑情况,这个方法比较简单,就是遍历之前使用AddHintProvider方法添加过的所有hint provider,依次调用它们的GetPodTopologyHints方法,再组合成一个slice返回。

// pkg/kubelet/cm/topologymanager/scope_pod.go:69
func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint {
    var providersHints []map[string][]TopologyHint

    for _, provider := range s.hintProviders {
        // Get the TopologyHints for a Pod from a provider.
        hints := provider.GetPodTopologyHints(pod)
        providersHints = append(providersHints, hints)
        klog.InfoS("TopologyHints", "hints", hints, "pod", klog.KObj(pod))
    }
    return providersHints
}

merge policy

在上面的calculateAffinity方法中,获取到所有的providersHints后,需要依据不同的策略对这些不同manager提供的拓扑建议进行最终的合并,得到pod的最优numa拓扑选择。

这里就是policy接口的Merge方法被调用的时候。

由于本文暂不深入到hint相关的数据结构及算法,所以对于每种策略的合并细节就先不展开,只需要了解每种策略的合并原则即可(参考引文)。

GetPodTopologyHints&Allocate

以上我们了解了topology manager的整体工作流程,可以发现topology manager是一个总领全局,负责汇总的角色。接下来想要了解具体的资源分配和numa的关系,就需要深入到hint provider逻辑中,比如topology manager调用了其他manager(也就是hint provider)的GetPodTopologyHints和Allocate接口。所以接下来分析cpu manager的核心实现。

kubelet cpu manager核心代码流程分析

构造cpu manager

在上面topology manager的初始化完成后,AddHintProvider调用前,需要对cpu manager进行初始化。其构造函数如下

// pkg/kubelet/cm/cpumanager/cpu_manager.go:143
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
    var topo *topology.CPUTopology
    var policy Policy

    switch policyName(cpuPolicyName) {

    case PolicyNone:
        policy = NewNonePolicy()

    case PolicyStatic:
        var err error
        topo, err = topology.Discover(machineInfo)
        if err != nil {
            return nil, err
        }
        klog.InfoS("Detected CPU topology", "topology", topo)

        reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
    ...
    policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
    ...
}

构造函数中最重要的是指定policy,policy如果为none,则不会绑定cpu核心,如果为static,则会对核心进行绑定,且绑定结果不会变化。另一个重要的参数是cpu的拓扑结构,该数据被存储在了监控相关的结构体*cadvisorapi.MachineInfo中。其他参数与核心绑定关系不大。

由于我们关心的是核心绑定,所以与之相关的policy:static policy是重点关注的。所以static policy的构造函数也值得关注:

// pkg/kubelet/cm/cpumanager/policy_static.go:90
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) {
...
}

该构造函数接受的topology.CPUTopology结构体是一个比较直观地反应node cpu numa拓扑的数据结构,可以方便地从中获取cpu的核心细节,包括numa node、core数量等。同时cpu manager支持保留部分cpu或者指定部分cpu不给pod分配。最后一个参数在构造时看不出用处,在cpu manager构造时,传入了topology manager的实例作为该参数的值,并被policy持有。

GetTopologyHints

cpu manager和topology manger类似,也是采用了策略模式,将实际的行为执行逻辑分散在了不同的policy下,所以实际调用cpu manager的GetTopologyHints方法时,会传导到policy的GetTopologyHints方法。我们以static policy的GetTopologyHints做为重点学习。

另外,由于topology manager有不同的scope,所以对应到GetTopologyHints,有两种函数签名,一种仅接受pod,另一种同时接受container。

// pkg/kubelet/cm/cpumanager/policy_static.go:338
func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
  ...
  if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
            ...
        }
}

从函数签名上看,入参里的pod和返回值的TopologyHint都很好理解。其函数体内主要是实现了最优选择的算法,本文也暂时略过。重点关注state参数:state是持久化相关的核心变量,在上面的函数中,会先查询持久化状态中的cpu分配情况,再根据情况做不同处理。在该函数中入参中的state.State传入的是cpu manager持有的state,但是在上面的构造函数中,并未看到有为cpu manager的state初始化实例。

进一步搜索可以看到,cpu manager构造以后,还有一个Start函数,在其中为cpu manager初始化了state。

// pkg/kubelet/cm/cpumanager/cpu_manager.go:198
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
    ...
  stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
    if err != nil {
        klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
        return err
    }
    m.state = stateImpl
  ...
}

查看state.State接口,可以看到k8s里有两种存储实现,一种是上面代码里cpu manager用到的StateCheckpoint,另一种是基于内存的StateMemory。

Allocate

和GetTopologyHints一样,Allocate的实际操作也下放到了policy。所以我们仍然以static policy的Allocate做为学习目标。

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
  ...
}

函数内的逻辑主要是维护state。

方案设计

  1. 内核参数预留一部分核心,这部分核心kubelet和调度器都感知不到,无法直接使用

  2. 为了能让预留核心能够被使用,需要修改kubelet,实现读取预留核心,并支持对预留核心进行容器cpu set

  3. 此时出现问题:

    1. 默认调度器感知到的cpu是非保留核,无法针对保留核做出决策。(甚至做出错误的决策,因为比较cpu需求的时候用的池子不对)

      # 默认scheduler的调度模块
      filter:
        enabled:
            - name: NodeUnschedulable
                weight: 0
            - name: NodeName
                weight: 0
            - name: TaintToleration
                weight: 0
            - name: NodeAffinity
                weight: 0
            - name: NodePorts
                weight: 0
            - name: NodeResourcesFit
                weight: 0
    2. kubelet只知道自己的预留核心使用情况,不知道其他节点的,所以需要调度器全局调度,优选节点。从全局的角度尽量让更多的CPU需求绑定到单NUMA上。

  4. 因此方案思路:

    1. 内核预留核心
    2. kubelet收集内核预留核信息,维护在node注解中
    3. 实现一个自定义scheduler,该scheduler维护一个中心化的包含所有节点的cpu拓扑信息(基于topology manager和cpu manager进行简化定制)
    4. 启动时读取node注解,将node拓扑加载到topology manager和cpu manager中
    5. 调度pod时,在判定cpu是否足够时,改变cpu池子(filter扩展点)
    6. 调度结果确定后,topo manager先调用Admit方法校验cpu拓扑是否满足需求,校验通过后,将最优的核心分配情况通过注解保存到pod,注意此时还不能从manager state中实际扣除资源,因为调度成功到pod真正跑起来还有一段距离(reserve扩展点)
    7. kubelet根据pod注解中的cpu分配情况,进行cpuset
    8. 加入controller,同步状态
      1. pod被删除时,释放manager中的相关状态,并进行资源回收
      2. pod成功跑起来后,维护manager中的状态,标记资源分配

参考

k8s资源拓扑感知——资源分配 | Edwardesire