Golang 开发者kubernetes Python 运维

NodeController 源码分析

2020-01-10  本文已影响0人  田飞雨

在早期的版本中 NodeController 只有一种,v1.16 版本中 NodeController 已经分为了 NodeIpamController 与 NodeLifecycleController,本文主要介绍 NodeLifecycleController。

NodeLifecycleController 的功能

NodeLifecycleController 主要功能是定期监控 node 的状态并根据 node 的 condition 添加对应的 taint 标签或者直接驱逐 node 上的 pod。

taint 的作用

在介绍 NodeLifecycleController 的源码前有必要先介绍一下 taint 的作用,因为 NodeLifecycleController 功能最终的结果有很大一部分都体现在 node taint 上。

taint 使用效果(Effect):

NodeLifecycleController 中的 feature-gates

在 NodeLifecycleController 用到了多个 feature-gates,此处先进行解释下:

NodeLifecycleController 源码分析

kubernetes 版本:v1.16

startNodeLifecycleController

首先还是看 NodeLifecycleController 的启动方法 startNodeLifecycleController,在 startNodeLifecycleController 中主要调用了 lifecyclecontroller.NewNodeLifecycleController 对 lifecycleController 进行初始化,在该方法中传入了组件的多个参数以及 TaintBasedEvictionsTaintNodesByCondition 两个 feature-gates,然后调用了 lifecycleController.Run 启动 lifecycleController,可以看到 NodeLifecycleController 主要监听 lease、pods、nodes、daemonSets 四种对象。

其中在启动时指定的几个参数默认值分别为:

k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go:163

func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
    lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
        ctx.InformerFactory.Coordination().V1beta1().Leases(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Core().V1().Nodes(),
        ctx.InformerFactory.Apps().V1().DaemonSets(),
        ctx.ClientBuilder.ClientOrDie("node-controller"),
        ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
        ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
        ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
        ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
        ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
        ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
        ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
        ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
        ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
        utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
        utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
    )
    if err != nil {
        return nil, true, err
    }
    go lifecycleController.Run(ctx.Stop)
    return nil, true, nil
}

NewNodeLifecycleController

首先有必要说明一下 NodeLifecycleController 对象中部分字段的意义,其结构体如下所示:

type Controller struct {
    taintManager *scheduler.NoExecuteTaintManager

    podInformerSynced cache.InformerSynced
    kubeClient        clientset.Interface

    now func() metav1.Time

    // 计算 zone 下 node 驱逐速率
    enterPartialDisruptionFunc func(nodeNum int) float32
    enterFullDisruptionFunc    func(nodeNum int) float32
    
    // 计算 zone 状态
    computeZoneStateFunc       func(nodeConditions []*v1.NodeCondition) (int, ZoneState)

    // 用来记录NodeController observed节点的集合
    knownNodeSet map[string]*v1.Node
    // 记录 node 最近一次状态的集合
    nodeHealthMap map[string]*nodeHealthData

    evictorLock sync.Mutex

    // 需要驱逐节点上 pod 的 node 队列 
    zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue

    // 需要打 taint 标签的 node 队列
    zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue

    // 将 node 划分为不同的 zone
    zoneStates map[string]ZoneState

    daemonSetStore          appsv1listers.DaemonSetLister
    daemonSetInformerSynced cache.InformerSynced
    leaseLister         coordlisters.LeaseLister
    leaseInformerSynced cache.InformerSynced
    nodeLister          corelisters.NodeLister
    nodeInformerSynced  cache.InformerSynced

    getPodsAssignedToNode func(nodeName string) ([]v1.Pod, error)

    recorder record.EventRecorder

    // kube-controller-manager 启动时指定的几个参数
    nodeMonitorPeriod time.Duration
    nodeStartupGracePeriod time.Duration
    nodeMonitorGracePeriod time.Duration
    podEvictionTimeout          time.Duration
    evictionLimiterQPS          float32
    secondaryEvictionLimiterQPS float32
    largeClusterThreshold       int32
    unhealthyZoneThreshold      float32

    // 启动时默认开启的几个 feature-gates
    runTaintManager bool
    useTaintBasedEvictions bool
    taintNodeByCondition bool

    nodeUpdateQueue workqueue.Interface
}

NewNodeLifecycleController 的主要逻辑为:

由以上逻辑可以看出,taintManager 以及 NodeLifecycleController 都会 watch node 的变化并进行不同的处理。

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:268

func NewNodeLifecycleController(......) (*Controller, error) {
    ......

    // 1、初始化 controller 对象
    nc := &Controller{
        ......
    }
        
    ......
        
    // 2、注册计算 node 驱逐速率以及 zone 状态的方法
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
    nc.computeZoneStateFunc = nc.ComputeZoneState

    // 3、为 podInformer 注册 EventHandler,监听到的对象会被放到 nc.taintManager.PodUpdated 中
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*v1.Pod)
            if nc.taintManager != nil {
                nc.taintManager.PodUpdated(nil, pod)
            }
        },
        UpdateFunc: func(prev, obj interface{}) {
            prevPod := prev.(*v1.Pod)
            newPod := obj.(*v1.Pod)
            if nc.taintManager != nil {
                nc.taintManager.PodUpdated(prevPod, newPod)
            }
        },
        DeleteFunc: func(obj interface{}) {
            pod, isPod := obj.(*v1.Pod)
            if !isPod {
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok {
                    return
                }
                pod, ok = deletedState.Obj.(*v1.Pod)
                if !ok {
                    return
                }
            }
            if nc.taintManager != nil {
                nc.taintManager.PodUpdated(pod, nil)
            }
        },
    })
    nc.podInformerSynced = podInformer.Informer().HasSynced
    podInformer.Informer().AddIndexers(cache.Indexers{
        nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
            pod, ok := obj.(*v1.Pod)
            if !ok {
                return []string{}, nil
            }
            if len(pod.Spec.NodeName) == 0 {
                return []string{}, nil
            }
            return []string{pod.Spec.NodeName}, nil
        },
    })

    podIndexer := podInformer.Informer().GetIndexer()
    nc.getPodsAssignedToNode = func(nodeName string) ([]v1.Pod, error) {
        objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
        if err != nil {
            return nil, err
        }
        pods := make([]v1.Pod, 0, len(objs))
        for _, obj := range objs {
            pod, ok := obj.(*v1.Pod)
            if !ok {
                continue
            }
            pods = append(pods, *pod)
        }
        return pods, nil
    }
    
    // 4、初始化 TaintManager,为 nodeInformer 注册 EventHandler
    //    监听到的对象会被放到 nc.taintManager.NodeUpdated 中
    if nc.runTaintManager {
        podLister := podInformer.Lister()
        podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
        nodeLister := nodeInformer.Lister()
        nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
        
        // 5、初始化 taintManager
        nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
        nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
                nc.taintManager.NodeUpdated(nil, node)
                return nil
            }),
            UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
                nc.taintManager.NodeUpdated(oldNode, newNode)
                return nil
            }),
            DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
                nc.taintManager.NodeUpdated(node, nil)
                return nil
            }),
        })
    }
    
    // 6、为 NodeLifecycleController 注册 nodeInformer 
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
            nc.nodeUpdateQueue.Add(node.Name)
            return nil
        }),
        UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
            nc.nodeUpdateQueue.Add(newNode.Name)
            return nil
        }),
    })

    ......
        
    // 7、检查是否启用了 NodeLease feature-gates
    nc.leaseLister = leaseInformer.Lister()
    if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
        nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
    } else {
        nc.leaseInformerSynced = func() bool { return true }
    }

    nc.nodeLister = nodeInformer.Lister()
    nc.nodeInformerSynced = nodeInformer.Informer().HasSynced

    nc.daemonSetStore = daemonSetInformer.Lister()
    nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced

    return nc, nil
}

Run

Run 方法是 NodeLifecycleController 的启动方法,其中会启动多个 goroutine 完成 controller 的功能,主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:455

func (nc *Controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    defer klog.Infof("Shutting down node controller")

    if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {         return
    }

    // 1、启动 taintManager
    if nc.runTaintManager {
        go nc.taintManager.Run(stopCh)
    }

    defer nc.nodeUpdateQueue.ShutDown()

    // 2、执行 nc.doNodeProcessingPassWorker
    for i := 0; i < scheduler.UpdateWorkerSize; i++ {
        go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
    }

    // 3、根据是否启用 TaintBasedEvictions 执行不同的处理逻辑
    if nc.useTaintBasedEvictions {
        go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
    } else {
        go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
    }

    // 4、执行 nc.monitorNodeHealth
    go wait.Until(func() {
        if err := nc.monitorNodeHealth(); err != nil {
            klog.Errorf("Error monitoring node health: %v", err)
        }
    }, nc.nodeMonitorPeriod, stopCh)

    <-stopCh
}

Run 方法中主要调用了 5 个方法来完成其核心功能:

下文会详细分析以上 5 种方法的具体实现。

nc.taintManager.Run

当组件启动时设置 --enable-taint-manager 参数为 true 时(默认为 true),该功能会启用,其主要作用是当该 node 上的 pod 不容忍 node taint 时将 pod 进行驱逐,若不开启该功能则已调度到该 node 上的 pod 会继续存在,新创建的 pod 需要容忍 node 的 taint 才会调度至该 node 上。

主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:185

func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
    for i := 0; i < UpdateWorkerSize; i++ {
        tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
        tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
    }

    go func(stopCh <-chan struct{}) {
        for {
            item, shutdown := tc.nodeUpdateQueue.Get()
            if shutdown {
                break
            }
            nodeUpdate := item.(nodeUpdateItem)
            hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
            select {
            case <-stopCh:
                tc.nodeUpdateQueue.Done(item)
                return
            case tc.nodeUpdateChannels[hash] <- nodeUpdate:
            }
        }
    }(stopCh)
    
    go func(stopCh <-chan struct{}) {
        for {
            item, shutdown := tc.podUpdateQueue.Get()
            if shutdown {
                break
            }
            podUpdate := item.(podUpdateItem)
            hash := hash(podUpdate.nodeName, UpdateWorkerSize)
            select {
            case <-stopCh:
                tc.podUpdateQueue.Done(item)
                return
            case tc.podUpdateChannels[hash] <- podUpdate:
            }
        }
    }(stopCh)

    wg := sync.WaitGroup{}
    wg.Add(UpdateWorkerSize)
    for i := 0; i < UpdateWorkerSize; i++ {
        go tc.worker(i, wg.Done, stopCh)
    }
    wg.Wait()
}
tc.worker

tc.worker 主要功能是调用 tc.handleNodeUpdatetc.handlePodUpdate 处理 tc.nodeUpdateChannelstc.podUpdateChannels 两个 channel 中的数据,但会优先处理 nodeUpdateChannels 中的数据。

k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:243

func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
    defer done()

    for {
        select {
        case <-stopCh:
            return
        case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
            tc.handleNodeUpdate(nodeUpdate)
            tc.nodeUpdateQueue.Done(nodeUpdate)
        case podUpdate := <-tc.podUpdateChannels[worker]:

        // 优先处理 nodeUpdateChannels 
        priority:
            for {
                select {
                case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
                    tc.handleNodeUpdate(nodeUpdate)
                    tc.nodeUpdateQueue.Done(nodeUpdate)
                default:
                    break priority
                }
            }
            tc.handlePodUpdate(podUpdate)
            tc.podUpdateQueue.Done(podUpdate)
        }
    }
}
tc.handleNodeUpdate

tc.handleNodeUpdate 的主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:417

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
    node, err := tc.getNode(nodeUpdate.nodeName)
    if err != nil {
        ......
    }
    // 1、获取 node 的 taints
    taints := getNoExecuteTaints(node.Spec.Taints)
    func() {
        tc.taintedNodesLock.Lock()
        defer tc.taintedNodesLock.Unlock()
        if len(taints) == 0 {
            delete(tc.taintedNodes, node.Name)
        } else {
            tc.taintedNodes[node.Name] = taints
        }
    }()
    
    // 2、获取 node 上的所有 pod
    pods, err := tc.getPodsAssignedToNode(node.Name)
    if err != nil {
        klog.Errorf(err.Error())
        return
    }
    if len(pods) == 0 {
        return
    }

    // 3、若不存在 taints,则取消所有的驱逐操作
    if len(taints) == 0 {
        for i := range pods {
            tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
        }
        return
    }

    now := time.Now()
    for i := range pods {
        pod := &pods[i]
        podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
        // 4、调用 tc.processPodOnNode 进行处理
        tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
    }
}
tc.handlePodUpdate

主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:377

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
    pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
    if err != nil {
        ......
    }

    if pod.Spec.NodeName != podUpdate.nodeName {
        return
    }

    podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}

    nodeName := pod.Spec.NodeName
    if nodeName == "" {
        return
    }
    taints, ok := func() ([]v1.Taint, bool) {
        tc.taintedNodesLock.Lock()
        defer tc.taintedNodesLock.Unlock()
        taints, ok := tc.taintedNodes[nodeName]
        return taints, ok
    }()

    if !ok {
        return
    }
    // 调用 tc.processPodOnNode 进行处理
    tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
tc.processPodOnNode

tc.handlePodUpdatetc.handleNodeUpdate 最终都是调用 tc.processPodOnNode 检查 pod 是否容忍 node 的 taints,tc.processPodOnNode 首先检查 pod 的 tolerations 是否能匹配 node 上所有的 taints,若无法完全匹配则将 pod 加入到 taintEvictionQueue 然后被删除,若能匹配首先获取 pod tolerations 中的最小容忍时间,如果 tolerations 未设置容忍时间说明会一直容忍则直接返回,否则加入到 taintEvictionQueue 的延迟队列中,当达到最小容忍时间时 pod 会被加入到 taintEvictionQueue 中并驱逐。

通常情况下,如果给一个节点添加了一个 effect 值为 NoExecute 的 taint,则任何不能忍受这个 taint 的 pod 都会马上被驱逐,任何可以忍受这个 taint 的 pod 都不会被驱逐。但是,如果 pod 存在一个 effect 值为 NoExecute 的 toleration 指定了可选属性 tolerationSeconds 的值,则表示在给节点添加了上述 taint 之后,pod 还能继续在节点上运行的时间。例如,

tolerations:
- key: "key1"
  operator: "Equal"
  value: "value1"
  effect: "NoExecute"
  tolerationSeconds: 3600

这表示如果这个 pod 正在运行,然后一个匹配的 taint 被添加到其所在的节点,那么 pod 还将继续在节点上运行 3600 秒,然后被驱逐。如果在此之前上述 taint 被删除了,则 pod 不会被驱逐。

k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:339

func (tc *NoExecuteTaintManager) processPodOnNode(......) {
    if len(taints) == 0 {
        tc.cancelWorkWithEvent(podNamespacedName)
    }
    // 1、检查 pod 的 tolerations 是否匹配所有 taints
    allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
    if !allTolerated {
        tc.cancelWorkWithEvent(podNamespacedName)
        tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
        return
    }
    
    // 2、获取最小容忍时间
    minTolerationTime := getMinTolerationTime(usedTolerations)
    if minTolerationTime < 0 {
        return
    }
    
    // 3、若存在最小容忍时间则将其加入到延时队列中
    startTime := now
    triggerTime := startTime.Add(minTolerationTime)
    scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
    if scheduledEviction != nil {
        startTime = scheduledEviction.CreatedAt
        if startTime.Add(minTolerationTime).Before(triggerTime) {
            return
        }
        tc.cancelWorkWithEvent(podNamespacedName)
    }
    tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

nc.doNodeProcessingPassWorker

NodeLifecycleController 中 nodeInformer 监听到 node 变化时会将其添加到 nodeUpdateQueue 中,nc.doNodeProcessingPassWorker 主要是处理 nodeUpdateQueue 中的 node,为其添加合适的 NoSchedule taint 以及 label,其主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:502

func (nc *Controller) doNodeProcessingPassWorker() {
    for {
        obj, shutdown := nc.nodeUpdateQueue.Get()
        if shutdown {
            return
        }
        nodeName := obj.(string)
        if nc.taintNodeByCondition {
            if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
                ......
            }
        }

        if err := nc.reconcileNodeLabels(nodeName); err != nil {
            ......
        }
        nc.nodeUpdateQueue.Done(nodeName)
    }
}

func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
    // 1、获取 node 对象
    node, err := nc.nodeLister.Get(nodeName)
    if err != nil {
        ......
    }
        
    // 2、若 node 存在对应的 condition 则为其添加对应的 taint
    var taints []v1.Taint
    for _, condition := range node.Status.Conditions {
        if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
            if taintKey, found := taintMap[condition.Status]; found {
                taints = append(taints, v1.Taint{
                    Key:    taintKey,
                    Effect: v1.TaintEffectNoSchedule,
                })
            }
        }
    }
    
    // 3、判断是否为 Unschedulable 
    if node.Spec.Unschedulable {
        taints = append(taints, v1.Taint{
            Key:    schedulerapi.TaintNodeUnschedulable,
            Effect: v1.TaintEffectNoSchedule,
        })
    }

    nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
        if t.Effect != v1.TaintEffectNoSchedule {
            return false
        }
        if t.Key == schedulerapi.TaintNodeUnschedulable {
            return true
        }
        _, found := taintKeyToNodeConditionMap[t.Key]
        return found
    })
    
    // 4、对比 node 已有 taints 和需要添加的 taints 得到 taintsToAdd, taintsToDel
    taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
    if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
        return nil
    }
    
    // 5、更新 node 的 taints
    if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
        return fmt.Errorf("failed to swap taints of node %+v", node)
    }
    return nil
}

nc.doNoExecuteTaintingPass

当启用了 TaintBasedEvictions 特性时,通过 nc.monitorNodeHealth 检测到 node 异常时会将其加入到 nc.zoneNoExecuteTainter 队列中,nc.doNoExecuteTaintingPass 会处理 nc.zoneNoExecuteTainter 队列中的 node,并且会按一定的速率进行,此时会根据 node 实际的 NodeCondition 为 node 添加对应的 taint,当 node 存在 taint 时,taintManager 会驱逐 node 上的 pod。此过程中为 node 添加 taint 时进行了限速避免一次性驱逐过多 pod,在驱逐 node 上的 pod 时不会限速。

nc.doNoExecuteTaintingPass 的主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:582

func (nc *Controller) doNoExecuteTaintingPass() {
    nc.evictorLock.Lock()
    defer nc.evictorLock.Unlock()
    for k := range nc.zoneNoExecuteTainter {
        nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
            // 1、获取 node 对象
            node, err := nc.nodeLister.Get(value.Value)
            if apierrors.IsNotFound(err) {
                return true, 0
            } else if err != nil {
                return false, 50 * time.Millisecond
            }
            
            // 2、获取 node 的 NodeReadyCondition
            _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
            taintToAdd := v1.Taint{}
            oppositeTaint := v1.Taint{}
            
            // 3、判断 Condition 状态,并为其添加对应的 taint
            switch condition.Status {
            case v1.ConditionFalse:
                taintToAdd = *NotReadyTaintTemplate
                oppositeTaint = *UnreachableTaintTemplate
            case v1.ConditionUnknown:
                taintToAdd = *UnreachableTaintTemplate
                oppositeTaint = *NotReadyTaintTemplate
            default:
                return true, 0
            }
            
            // 4、更新 node 的 taint
            result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
            if result {
                zone := utilnode.GetZoneKey(node)
                evictionsNumber.WithLabelValues(zone).Inc()
            }

            return result, 0
        })
    }
}

nc.doEvictionPass

若未启用 TaintBasedEvictions 特性,此时通过 nc.monitorNodeHealth 检测到 node 异常时会将其加入到 nc.zonePodEvictor 队列中,nc.doEvictionPass 会将 nc.zonePodEvictor 队列中 node 上的 pod 驱逐掉。

nc.doEvictionPass 的主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:626

func (nc *Controller) doEvictionPass() {
    nc.evictorLock.Lock()
    defer nc.evictorLock.Unlock()
    for k := range nc.zonePodEvictor {
        nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
            node, err := nc.nodeLister.Get(value.Value)
            ......
            nodeUID, _ := value.UID.(string)
            remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
                return false, 0
            }
            ......

            if node != nil {
                zone := utilnode.GetZoneKey(node)
                evictionsNumber.WithLabelValues(zone).Inc()
            }

            return true, 0
        })
    }
}

nc.monitorNodeHealth

上面已经介绍了无论是否启用了 TaintBasedEvictions 特性,需要打 taint 或者驱逐 pod 的 node 都会被放在 zoneNoExecuteTainter 或者 zonePodEvictor 队列中,而 nc.monitorNodeHealth 就是这两个队列中数据的生产者。nc.monitorNodeHealth 的主要功能是持续监控 node 的状态,当 node 处于异常状态时更新 node 的 taint 以及 node 上 pod 的状态或者直接驱逐 node 上的 pod,此外还会为集群下的所有 node 划分 zoneStates 并为每个 zoneStates 设置对应的驱逐速率。

nc.monitorNodeHealth 主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:664

func (nc *Controller) monitorNodeHealth() error {
    // 1、从 nodeLister 获取所有 node
    nodes, err := nc.nodeLister.List(labels.Everything())
    if err != nil {
        return err
    }
    
    // 2、根据 controller knownNodeSet 中的记录将 node 分为三类
    added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
    
    // 3、为没有 zone 的 node 添加对应的 zone
    for i := range newZoneRepresentatives {
        nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
    }

    // 4、将新增加的 node 添加到 knownNodeSet 中并且对 node 进行初始化
    for i := range added {
        ......
        nc.knownNodeSet[added[i].Name] = added[i]
        nc.addPodEvictorForNewZone(added[i])
        if nc.useTaintBasedEvictions {
            nc.markNodeAsReachable(added[i])
        } else {
            nc.cancelPodEviction(added[i])
        }
    }
    
    // 5、将 deleted 列表中的 node 从 knownNodeSet 中删除
    for i := range deleted {
        ......
        delete(nc.knownNodeSet, deleted[i].Name)
    }

    zoneToNodeConditions := map[string][]*v1.NodeCondition{}
    for i := range nodes {
        var gracePeriod time.Duration
        var observedReadyCondition v1.NodeCondition
        var currentReadyCondition *v1.NodeCondition
        node := nodes[i].DeepCopy()
        
        // 6、获取 node 的 gracePeriod, observedReadyCondition, currentReadyCondition
        if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
            gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
            if err == nil {
                return true, nil
            }
            name := node.Name
            node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
            if err != nil {
                return false, err
            }
            return false, nil
        }); err != nil {
            ......
        }
        
        // 7、若 node 没有被排除则加入到 zoneToNodeConditions 列表中
        if !isNodeExcludedFromDisruptionChecks(node) {
            zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
        }

        decisionTimestamp := nc.now()
        
        // 8、根据 observedReadyCondition 为 node 添加不同的 taint 
        if currentReadyCondition != nil {
            switch observedReadyCondition.Status {
            
            case v1.ConditionFalse:
                // 9、false 状态添加 NotReady taint
                if nc.useTaintBasedEvictions {
                    if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
                        taintToAdd := *NotReadyTaintTemplate
                        if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
                            ......
                        }
                    } else if nc.markNodeForTainting(node) {
                        ......
                    }
                // 10、或者当超过 podEvictionTimeout 后直接驱逐 node 上的 pod
                } else {
                    if decisionTimestamp.After(nc.nodeHealthMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
                        if nc.evictPods(node) {
                            ......
                        }
                    }
                }
            case v1.ConditionUnknown:
                // 11、unknown 状态时添加 UnreachableTaint
                if nc.useTaintBasedEvictions {
                    if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
                        taintToAdd := *UnreachableTaintTemplate
                        if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
                            ......
                        }
                    } else if nc.markNodeForTainting(node) {
                        ......
                    }
                } else {
                    if decisionTimestamp.After(nc.nodeHealthMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
                        if nc.evictPods(node) {
                            ......
                        }
                    }
                }
            case v1.ConditionTrue:
                // 12、true 状态时移除所有 UnreachableTaint 和 NotReadyTaint
                if nc.useTaintBasedEvictions {
                    removed, err := nc.markNodeAsReachable(node)
                    if err != nil {
                        ......
                    }
                // 13、从 PodEviction 队列中移除
                } else {
                    if nc.cancelPodEviction(node) {
                        ......
                    }
                }
            }

            // 14、ReadyCondition 由 true 变为 false 时标记 node 上的 pod 为 notready
            if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
                nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
                if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
                    utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
                }
            }
        }
    }
    // 15、处理中断情况
    nc.handleDisruption(zoneToNodeConditions, nodes)

    return nil
}
nc.tryUpdateNodeHealth

nc.tryUpdateNodeHealth 会根据当前获取的 node status 更新 nc.nodeHealthMap 中的数据,nc.nodeHealthMap 保存 node 最近一次的状态,并会根据 nc.nodeHealthMap 判断 node 是否已经处于 unknown 状态。

nc.tryUpdateNodeHealth 的主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:851

func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
    var gracePeriod time.Duration
    var observedReadyCondition v1.NodeCondition
    _, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
    
    // 1、若 currentReadyCondition 为 nil 则 fake 一个 observedReadyCondition
    if currentReadyCondition == nil {
        observedReadyCondition = v1.NodeCondition{
            Type:               v1.NodeReady,
            Status:             v1.ConditionUnknown,
            LastHeartbeatTime:  node.CreationTimestamp,
            LastTransitionTime: node.CreationTimestamp,
        }
        gracePeriod = nc.nodeStartupGracePeriod
        if _, found := nc.nodeHealthMap[node.Name]; found {
            nc.nodeHealthMap[node.Name].status = &node.Status
        } else {
            nc.nodeHealthMap[node.Name] = &nodeHealthData{
                status:                   &node.Status,
                probeTimestamp:           node.CreationTimestamp,
                readyTransitionTimestamp: node.CreationTimestamp,
            }
        }
    } else {
        observedReadyCondition = *currentReadyCondition
        gracePeriod = nc.nodeMonitorGracePeriod
    }
    
    // 2、savedNodeHealth 中保存 node 最近的一次状态
    savedNodeHealth, found := nc.nodeHealthMap[node.Name]

    var savedCondition *v1.NodeCondition
    var savedLease *coordv1beta1.Lease
    if found {
        _, savedCondition = nodeutil.GetNodeCondition(savedNodeHealth.status, v1.NodeReady)
        savedLease = savedNodeHealth.lease
    }
    
    // 3、根据 savedCondition 以及 currentReadyCondition 更新 savedNodeHealth 中的数据
    if !found {
        savedNodeHealth = &nodeHealthData{
            status:                   &node.Status,
            probeTimestamp:           nc.now(),
            readyTransitionTimestamp: nc.now(),
        }
    } else if savedCondition == nil && currentReadyCondition != nil {
        savedNodeHealth = &nodeHealthData{
            status:                   &node.Status,
            probeTimestamp:           nc.now(),
            readyTransitionTimestamp: nc.now(),
        }
    } else if savedCondition != nil && currentReadyCondition == nil {
        savedNodeHealth = &nodeHealthData{
            status:                   &node.Status,
            probeTimestamp:           nc.now(),
            readyTransitionTimestamp: nc.now(),
        }
    } else if savedCondition != nil && currentReadyCondition != nil && savedCondition.LastHeartbeatTime != currentReadyCondition.LastHeartbeatTime {
        var transitionTime metav1.Time
        if savedCondition.LastTransitionTime != currentReadyCondition.LastTransitionTime {
            transitionTime = nc.now()
        } else {
            transitionTime = savedNodeHealth.readyTransitionTimestamp
        }
        
        savedNodeHealth = &nodeHealthData{
            status:                   &node.Status,
            probeTimestamp:           nc.now(),
            readyTransitionTimestamp: transitionTime,
        }
    }
    
    // 4、判断是否启用了 nodeLease 功能
    var observedLease *coordv1beta1.Lease
    if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
        observedLease, _ = nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
        if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
            savedNodeHealth.lease = observedLease
            savedNodeHealth.probeTimestamp = nc.now()
        }
    }
    nc.nodeHealthMap[node.Name] = savedNodeHealth

    // 5、检查 node 是否已经超过 gracePeriod 时间没有上报状态了
    if nc.now().After(savedNodeHealth.probeTimestamp.Add(gracePeriod)) {
        nodeConditionTypes := []v1.NodeConditionType{
            v1.NodeReady,
            v1.NodeMemoryPressure,
            v1.NodeDiskPressure,
            v1.NodePIDPressure,
        }
        nowTimestamp := nc.now()
        
        // 6、若 node 超过 gracePeriod 时间没有上报状态将其所有 Condition 设置 unknown
        for _, nodeConditionType := range nodeConditionTypes {
            _, currentCondition := nodeutil.GetNodeCondition(&node.Status, nodeConditionType)
            if currentCondition == nil {
                node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
                    Type:               nodeConditionType,
                    Status:             v1.ConditionUnknown,
                    Reason:             "NodeStatusNeverUpdated",
                    Message:            "Kubelet never posted node status.",
                    LastHeartbeatTime:  node.CreationTimestamp,
                    LastTransitionTime: nowTimestamp,
                })
            } else {
                if currentCondition.Status != v1.ConditionUnknown {
                    currentCondition.Status = v1.ConditionUnknown
                    currentCondition.Reason = "NodeStatusUnknown"
                    currentCondition.Message = "Kubelet stopped posting node status."
                    currentCondition.LastTransitionTime = nowTimestamp
                }
            }
        }

        // 7、更新 node 最新状态至 apiserver 并更新 nodeHealthMap 中的数据
        _, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
        if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
            if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(node); err != nil {
                return gracePeriod, observedReadyCondition, currentReadyCondition, err
            }
            nc.nodeHealthMap[node.Name] = &nodeHealthData{
                status:                   &node.Status,
                probeTimestamp:           nc.nodeHealthMap[node.Name].probeTimestamp,
                readyTransitionTimestamp: nc.now(),
                lease:                    observedLease,
            }
            return gracePeriod, observedReadyCondition, currentReadyCondition, nil
        }
    }

    return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
nc.handleDisruption

monitorNodeHealth 中会为每个 node 划分 zone 并设置 zoneState,nc.handleDisruption 的目的是当集群中不同 zone 下出现多个 unhealthy node 时会 zone 设置不同的驱逐速率。

nc.handleDisruption 主要逻辑为:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:1017

func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
    newZoneStates := map[string]ZoneState{}
    allAreFullyDisrupted := true
    
    // 1、判断当前所有 zone 是否都为 FullDisruption 状态
    for k, v := range zoneToNodeConditions {
        zoneSize.WithLabelValues(k).Set(float64(len(v)))
        // 2、计算 zone state 以及 unhealthy node
        unhealthy, newState := nc.computeZoneStateFunc(v)
        zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
        unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
        if newState != stateFullDisruption {
            allAreFullyDisrupted = false
        }
        newZoneStates[k] = newState
        if _, had := nc.zoneStates[k]; !had {
            nc.zoneStates[k] = stateInitial
        }
    }

    // 3、判断上一次观察到的所有 zone 是否都为 FullDisruption 状态
    allWasFullyDisrupted := true
    for k, v := range nc.zoneStates {
        if _, have := zoneToNodeConditions[k]; !have {
            zoneSize.WithLabelValues(k).Set(0)
            zoneHealth.WithLabelValues(k).Set(100)
            unhealthyNodes.WithLabelValues(k).Set(0)
            delete(nc.zoneStates, k)
            continue
        }
        if v != stateFullDisruption {
            allWasFullyDisrupted = false
            break
        }
    }
    // 4、若存在一个不为 FullyDisrupted 
    if !allAreFullyDisrupted || !allWasFullyDisrupted {
        // 5、如果 allAreFullyDisrupted 为 true,则 allWasFullyDisrupted 为 false
        //   说明从非 FullyDisrupted 切换到了 FullyDisrupted 模式
        if allAreFullyDisrupted {
            for i := range nodes {
                if nc.useTaintBasedEvictions {
                    _, err := nc.markNodeAsReachable(nodes[i])
                    if err != nil {
                        klog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
                    }
                } else {
                    nc.cancelPodEviction(nodes[i])
                }
            }
            
            for k := range nc.zoneStates {
                if nc.useTaintBasedEvictions {
                    nc.zoneNoExecuteTainter[k].SwapLimiter(0)
                } else {
                    nc.zonePodEvictor[k].SwapLimiter(0)
                }
            }
            for k := range nc.zoneStates {
                nc.zoneStates[k] = stateFullDisruption
            }
            return
        }
        // 6、如果 allWasFullyDisrupted 为 true,则 allAreFullyDisrupted 为 false
        //   说明 cluster 从 FullyDisrupted 切换为非 FullyDisrupted 模式
        if allWasFullyDisrupted {
            now := nc.now()
            for i := range nodes {
                v := nc.nodeHealthMap[nodes[i].Name]
                v.probeTimestamp = now
                v.readyTransitionTimestamp = now
                nc.nodeHealthMap[nodes[i].Name] = v
            }

            for k := range nc.zoneStates {
                nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
                nc.zoneStates[k] = newZoneStates[k]
            }
            return
        }

        // 7、根据 zoneState 为每个 zone 设置驱逐速率
        for k, v := range nc.zoneStates {
            newState := newZoneStates[k]
            if v == newState {
                continue
            }

            nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
            nc.zoneStates[k] = newState
        }
    }
}
nc.computeZoneStateFunc

nc.computeZoneStateFunc 是计算 zone state 的方法,该方法会计算每个 zone 下 notReady 的 node 并将 zone 分为三种:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:1262

func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
    readyNodes := 0
    notReadyNodes := 0
    for i := range nodeReadyConditions {
        if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
            readyNodes++
        } else {
            notReadyNodes++
        }
    }
    switch {
    case readyNodes == 0 && notReadyNodes > 0:
        return notReadyNodes, stateFullDisruption
    case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
        return notReadyNodes, statePartialDisruption
    default:
        return notReadyNodes, stateNormal
    }
}
nc.setLimiterInZone

nc.setLimiterInZone 方法会根据不同的 zoneState 设置对应的驱逐速率:

k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:1115

func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
    switch state {
    case stateNormal:
        if nc.useTaintBasedEvictions {
            nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
        } else {
            nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
        }
    case statePartialDisruption:
        if nc.useTaintBasedEvictions {
            nc.zoneNoExecuteTainter[zone].SwapLimiter(
                nc.enterPartialDisruptionFunc(zoneSize))
        } else {
            nc.zonePodEvictor[zone].SwapLimiter(
                nc.enterPartialDisruptionFunc(zoneSize))
        }
    case stateFullDisruption:
        if nc.useTaintBasedEvictions {
            nc.zoneNoExecuteTainter[zone].SwapLimiter(
                nc.enterFullDisruptionFunc(zoneSize))
        } else {
            nc.zonePodEvictor[zone].SwapLimiter(
                nc.enterFullDisruptionFunc(zoneSize))
        }
    }
}
小结

monitorNodeHealth 中的主要流程如下所示:

                               monitorNodeHealth
                                      |
                                      |
                            useTaintBasedEvictions
                                      |
                                      |
               ---------------------------------------------
           yes |                                           | no
               |                                           |
               v                                           v
       addPodEvictorForNewZone                         evictPods
               |                                           |
               |                                           |
               v                                           v
       zoneNoExecuteTainter                         zonePodEvictor
    (RateLimitedTimedQueue)                     (RateLimitedTimedQueue)
               |                                           |
               |                                           |
               |                                           |
               v                                           v
       doNoExecuteTaintingPass                       doEvictionPass
           (consumer)                                 (consumer)

NodeLifecycleController 中三个核心组件之间的交互流程如下所示:

                            monitorNodeHealth
                                    |
                                    |
                                    | 为 node 添加 NoExecute taint
                                    |
                                    |
                                    v       为 node 添加
                 watch nodeList           NoSchedule taint
     taintManager   ------>     APIServer  <-----------  nc.doNodeProcessingPassWorker
           |
           |
           |
           v
    驱逐 node 上不容忍
    node taint 的 pod 

至此,NodeLifecycleController 的核心代码已经分析完。

总结

本文主要分析了 NodeLifecycleController 的设计与实现,NodeLifecycleController 主要是监控 node 状态,当 node 异常时驱逐 node 上的 pod,其行为与其他组件有一定关系,node 的状态由 kubelet 上报,node 异常时为 node 添加 taint 标签后,scheduler 调度 pod 也会有对应的行为。为了保证由于网络等问题引起的 pod 驱逐行为,NodeLifecycleController 会为 node 进行分区并会为每个区设置不同的驱逐速率,即实际上会以 rate-limited 的方式添加 taint,在某些情况下可以避免 pod 被大量驱逐。

此外,NodeLifecycleController 还会对外暴露多个 metrics,包括 zoneHealth、zoneSize、unhealthyNodes、evictionsNumber 等,便于用户查看集群下 node 的状态。

参考:

https://kubernetes.io/zh/docs/concepts/configuration/taint-and-toleration/

https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/

上一篇下一篇

猜你喜欢

热点阅读