kubelet eviction manager代码走读
一、Eviction Manager初始化
Kubelet Eviction Manager主要业务代码在 pkg/kubelet/eviction
中。kubelet在实例化时,调用eviction.NewManager()
,实例化了eviction manager对象。
1.1 NewMainKubelet()
// pkg/kubelet/kubelet.go
func NewMainKubelet(...){
...
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(
klet.resourceAnalyzer,
evictionConfig,
killPodNow(klet.podWorkers, kubeDeps.Recorder),
klet.podManager.GetMirrorPodByPod,
klet.imageManager,
klet.containerGC,
kubeDeps.Recorder,
nodeRef,
klet.clock
)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
....
}
// pkg/kubelet/eviction/eviction_manager.go: 107
func NewManager(
summaryProvider stats.SummaryProvider,
config Config,
killPodFunc KillPodFunc,
mirrorPodFunc MirrorPodFunc,
imageGC ImageGC,
containerGC ContainerGC,
recorder record.EventRecorder,
nodeRef *v1.ObjectReference,
clock clock.Clock,
) (Manager, lifecycle.PodAdmitHandler) {
manager := &managerImpl{
clock: clock,
killPodFunc: killPodFunc,
mirrorPodFunc: mirrorPodFunc,
imageGC: imageGC,
containerGC: containerGC,
config: config,
recorder: recorder,
summaryProvider: summaryProvider,
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
dedicatedImageFs: nil,
thresholdNotifiers: []ThresholdNotifier{},
}
return manager, manager
}
1.2 kubelet.Run()
kubelet执行Run()方法开始工作时,启动了goroutine,每5s执行一次 updateRuntimeUp()
。在 updateRuntimeUp()
中,待确认runtime启动成功后,会调用 initializeRuntimeDependentModules()
完成runtime依赖模块的初始化工作。
// pkg/kubelet/kubelet.go: 1430
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
...
}
// pkg/kubelet/kubelet.go: 2199
func (kl *Kubelet) updateRuntimeUp() {
...
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
...
}
// pkg/kubelet/kubelet.go: 1386
func (kl *Kubelet) initializeRuntimeDependentModules() {
...
kl.evictionManager.Start(
kl.StatsProvider,
kl.GetActivePods,
kl.podResourcesAreReclaimed,
evictionMonitoringPeriod
)
...
}
initializeRuntimeDependentModules()
最先初始化了cAdvisor模块,对于节点的stats数据,都来源于cAdvisor的接口。至此,Eviction Manager正式启动工作了。
二、Eviction Manager定义
2.1 type managerImpl struct
在分析核心逻辑之前,先看下对象的结构体定义,关键字段的作用在注释做了解释。
// pkg/kubelet/eviction/eviction_manager.go: 60
type managerImpl struct {
// used to track time
clock clock.Clock
/*
evictionManager的配置,包括:
PressureTransitionPeriod( --eviction-pressure-transition-period)
MaxPodGracePeriodSeconds(--eviction-max-pod-grace-period)
Thresholds(--eviction-hard, --eviction-soft)
KernelMemcgNotification(--experimental-kernel-memcg-notification)
*/
config Config
// evict pod时kill pod的接口,kubelet NewManager的时候,赋值为killPodNow方法
killPodFunc KillPodFunc
// 获取静态pod的mirror pod的方法
mirrorPodFunc MirrorPodFunc
// 当node出现diskPressure condition时,imageGC进行unused images删除操作以回收disk space。
imageGC ImageGC
// the interface that knows how to do container gc
containerGC ContainerGC
// protects access to internal state
sync.RWMutex
// 当前节点条件集合
nodeConditions []v1.NodeConditionType
// 记录nodecontions上一次观察的时间
nodeConditionsLastObservedAt nodeConditionsObservedAt
// nodeRef is a reference to the node
nodeRef *v1.ObjectReference
// used to record events about the node
recorder record.EventRecorder
// 提供node和node上所有pods的最新status数据汇总,既NodeStats and []PodStats。
summaryProvider stats.SummaryProvider
// 记录threshold第一次观察到的时间
thresholdsFirstObservedAt thresholdsObservedAt
// 保存已经触发但还没解决的Thresholds,包括那些处于grace period等待阶段的Thresholds。
thresholdsMet []evictionapi.Threshold
// 定义各Resource进行evict挑选时的排名方法。
signalToRankFunc map[evictionapi.Signal]rankFunc
// 定义各Resource进行回收时调用的方法
signalToNodeReclaimFuncs map[evictionapi.Signal]nodeReclaimFuncs
// 上一次获取的eviction signal的记录,确保每次更新thresholds时都是按照正确的时间序列进行。
lastObservations signalObservations
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
dedicatedImageFs *bool
// 内存阈值通知器集合
thresholdNotifiers []ThresholdNotifier
// 上次thresholdNotifiers发通知的时间
thresholdsLastUpdated time.Time
}
2.2 m.Admit()
eviction.NewManager()
的代码很简单,就是赋值。但此方法返回了2个对象,一个是evictionManager,一个是lifecycle.PodAdmitHandler实例。这两个对象的内容相同,但是不同的两个实例。evictionAdmitHandler用来kubelet创建Pod前进行准入检查,满足条件后才会继续创建Pod,通过Admit(attrs *lifecycle.PodAdmitAttributes)
方法来检查,代码如下:
// pkg/kubelet/eviction/eviction_manager.go: 137
func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
m.RLock()
defer m.RUnlock()
if len(m.nodeConditions) == 0 {
return lifecycle.PodAdmitResult{Admit: true}
}
if kubelettypes.IsCriticalPod(attrs.Pod) {
return lifecycle.PodAdmitResult{Admit: true}
}
// Conditions other than memory pressure reject all pods
nodeOnlyHasMemoryPressureCondition :=
hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure) && len(m.nodeConditions) == 1
if nodeOnlyHasMemoryPressureCondition {
notBestEffort := v1.PodQOSBestEffort != v1qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}
// When node has memory pressure, check BestEffort Pod's toleration:
// admit it if tolerates memory pressure taint, fail for other tolerations, e.g. DiskPressure.
if v1helper.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
Key: v1.TaintNodeMemoryPressure,
Effect: v1.TaintEffectNoSchedule,
}) {
return lifecycle.PodAdmitResult{Admit: true}
}
}
// reject pods when under memory pressure (if pod is best effort), or if under disk pressure.
klog.Warningf("Failed to admit pod %s - node has conditions: %v", format.Pod(attrs.Pod), m.nodeConditions)
return lifecycle.PodAdmitResult{
Admit: false,
Reason: Reason,
Message: fmt.Sprintf(nodeConditionMessageFmt, m.nodeConditions),
}
}
上述pod admit逻辑,正是eviction manager对kube-scheduler的调度结果影响。Kubelet会定期的将Node Condition传给kube-apiserver并存于etcd。kube-scheduler watch到Node Condition Pressure之后,会根据以下策略,阻止更多Pods Bind到该Node。
Node Condition | Scheduler Behavior |
---|---|
MemoryPressure | No new BestEffort pods are scheduled to the node. |
DiskPressure | No new pods are scheduled to the node. |
三、Eviction Manager启动
3.1 m.Start()
在初始化小节,已经分析到 kl.evictionManager.Start(kl, kl.getActivePods, evictionMonitoringPeriod)
方法,下面看Start究竟做了什么事情:
// pkg/kubelet/eviction/eviction_manager.go: 177
func (m *managerImpl) Start(
diskInfoProvider DiskInfoProvider,
podFunc ActivePodsFunc,
podCleanedUpFunc PodCleanedUpFunc,
monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
klog.Infof(message)
m.synchronize(diskInfoProvider, podFunc)
}
/*
kubelet入参:--experimental-kernel-memcg-notification = true
为内存相关的驱逐信号,单独起个notifier,触发threshold,第一时间开始回收
*/
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable ||
threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(
threshold,
m.config.PodCgroupRoot,
&CgroupNotifierFactory{},
thresholdHandler
)
if err != nil {
klog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// 开启常规循环检查,每10秒执行一次
go func() {
for {
if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
klog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
}
}()
}
可以看到上面代码中有一个if模块和一个go func模块。
if模块的判断条件,来自kubelet的参数--experimental-kernel-memcg-notification
,若干配置为true,并且thresholds配置了内存相关的驱逐信号,则通过startMemoryThresholdNotifier()
启动notifier,当内存使用第一时间达到门槛,会立刻通知kubelet,并触发evictionManager.synchronize()
进行资源回收的流程。这样提高了eviction的实时性。
go func模块是常规模块,每10秒执行一次 evictionManager.synchronize()
,下面看下驱逐的业务逻辑函数synchronize()主要做了那些事情。
3.2 m.synchronize()
// pkg/kubelet/eviction/eviction_manager.go: 231
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 && !utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
return nil
}
klog.V(3).Infof("eviction manager: synchronize housekeeping")
// build the ranking functions (if not yet known)
// TODO: have a function in cadvisor that lets us know if global housekeeping has completed
if m.dedicatedImageFs == nil {
hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
if ok != nil {
return nil
}
m.dedicatedImageFs = &hasImageFs
// 注册各个eviction signal所对应的资源排序方法
m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
// 注册节点资源回收方法,例如imagefs.avaliable对用的是删除无用容器和无用镜像
m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
}
// 获取当前active的pods 备用
activePods := podFunc()
updateStats := true
// 获取节点的整体概况,即nodeStsts和podStats
summary, err := m.summaryProvider.Get(updateStats)
if err != nil {
klog.Errorf("eviction manager: failed to get summary stats: %v", err)
return nil
}
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
m.thresholdsLastUpdated = m.clock.Now()
for _, notifier := range m.thresholdNotifiers {
if err := notifier.UpdateThreshold(summary); err != nil {
klog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err)
}
}
}
/*
据传入的节点summary,获取各个驱逐信号对应的singleObservations和Pod的StatsFunc
singleObservation保存是某一时间,资源的可用量和总量
StatsFunc 后续对Pods进行Rank时需要用
*/
observations, statsFunc := makeSignalObservations(summary)
debugLogObservations("observations", observations)
// 计算得出当前触发阈值的threshold集合
thresholds = thresholdsMet(thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
// determine the set of thresholds previously met that have not yet satisfied the associated min-reclaim
if len(m.thresholdsMet) > 0 {
// 计算已经记录的但还没解决的thresholds
thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
// thresholds合并
thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
}
debugLogThresholdsWithObservation("thresholds - reclaim not satisfied", thresholds, observations)
// track when a threshold was first observed
now := m.clock.Now()
/*
更新对象中保存的thresholdsFirstObservedAt
thresholdsFirstObservedAt是map类型
*/
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
// 转换得到nodeConditions
nodeConditions := nodeConditions(thresholds)
if len(nodeConditions) > 0 {
klog.V(3).Infof("eviction manager: node conditions - observed: %v", nodeConditions)
}
// track when a node condition was last observed
nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)
// 更新对象中nodeConditionsLastObservedAt
nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
if len(nodeConditions) > 0 {
klog.V(3).Infof("eviction manager: node conditions - transition period not met: %v", nodeConditions)
}
// 过滤出满足优雅删除的thresholds
thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
debugLogThresholdsWithObservation("thresholds - grace periods satisfied", thresholds, observations)
// 更新对象各个参数
m.Lock()
m.nodeConditions = nodeConditions
m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
m.thresholdsMet = thresholds
// 从这轮的观察结果中去掉上一轮的观察结果出现的thresholds
thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations)
m.lastObservations = observations
m.Unlock()
// evict pods if there is a resource usage violation from local volume temporary storage
// If eviction happens in localStorageEviction function, skip the rest of eviction action
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
if evictedPods := m.localStorageEviction(summary, activePods); len(evictedPods) > 0 {
return evictedPods
}
}
if len(thresholds) == 0 {
klog.V(3).Infof("eviction manager: no resources are starved")
return nil
}
// 按照驱逐的优先级给thresholds排序
sort.Sort(byEvictionPriority(thresholds))
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
if !foundAny {
return nil
}
klog.Warningf("eviction manager: attempting to reclaim %v", resourceToReclaim)
// record an event about the resources we are now attempting to reclaim via eviction
m.recorder.Eventf(m.nodeRef, v1.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)
// 按照优先级最高的threshold回收节点资源
if m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) {
klog.Infof("eviction manager: able to reduce %v pressure without evicting pods.", resourceToReclaim)
return nil
}
klog.Infof("eviction manager: must evict pod(s) to reclaim %v", resourceToReclaim)
// 取出优先级最高的threshold对应的排序方法
rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
if !ok {
klog.Errorf("eviction manager: no ranking function for signal %s", thresholdToReclaim.Signal)
return nil
}
// the only candidates viable for eviction are those pods that had anything running.
if len(activePods) == 0 {
klog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict")
return nil
}
// 对activePods排驱逐优先级
rank(activePods, statsFunc)
klog.Infof("eviction manager: pods ranked for eviction: %s", format.Pods(activePods))
// record age of metrics for met thresholds that we are using for evictions.
for _, t := range thresholds {
timeObserved := observations[t.Signal].time
if !timeObserved.IsZero() {
metrics.EvictionStatsAge.WithLabelValues(string(t.Signal)).Observe(metrics.SinceInSeconds(timeObserved.Time))
}
}
// we kill at most a single pod during each eviction interval
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64(0)
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
// 杀pod
if m.evictPod(pod, gracePeriodOverride, message, annotations) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}
}
}
klog.Infof("eviction manager: unable to evict any pods from the node")
return nil
}
代码很工整,注释也相对详细,主要流程如下:
- 注册方法集合buildSignalToRankFunc和buildSignalToNodeReclaimFuncs:一个是驱逐信号对应的排序方法;另一个是注册各个驱逐信号对应的节点资源回收方法。
- 获取节点当前概况summary:即nodeStats和[]podStats;
- 获取当前的observations和statsFunc:observations是map[evictionapi.Signal]signalObservation类型,signalObservation保存了某一时间戳,资源的容量和可用量;statsFunc是后面对pod排序用的;
- 计算得出当前触发阈值的thresholds:threshold记录了一整个eviction配置,包括驱逐信号,阈值,gracePeriod,最小回收量;
- 合并thresholds:筛选出已经记录的但还没解决的thresholds,然后与上一步计算的thresholds合并;
- 计算得到thresholdsFirstObservedAt:遍历合并后的thresholds,对于m.thresholdsFirstObservedAt已经存在的threshold,更新观察时间;
- 转换得到nodeConditions:threshold和nodeCondition存在对应关系;
- 计算得到nodeConditionsLastObservedAt,遍历nodeConditions,对于m.nodeConditionsLastObservedAt已经存在的condition,更新观察时间;
- 筛选nodeConditions:在PressureTransitionPeriod内,nodecontion仍然是true的;
- 过滤出当前时间满足优雅删除的thresholds(i.e > GracePeriod );
- 更新对象参数:nodeConditions、thresholdsFirstObservedAt,nodeConditionsLastObservedAt、thresholds和observations,保存到对象中;
- 删选thresholds:遍历thresholds,observations出现lastObservations不存在的;
- 最终按照驱逐的优先级给thresholds排序;
- 取优先级最高的threshold,取名thresholdToReclaim,按照此驱逐配置回收节点资源,如果回收完成后满足
thresholdValue
+evictionMinimumReclaim
,则流程结束,否则驱逐pod; - 给待驱逐的pod排序;排序方法从注册的资源排序方法中取出,加上StatsFunc,对activePods排序;
- 遍历待驱逐的pods,如果kill某个pod失败,则会跳过这个pod,再按顺序挑下一个pod进行kill。只要某个pod kill成功,就返回结束,也就是说这个流程中,最多只会kill最多一个Pod。
上面的过程中,大部分都是按顺序业务操作,目的就是决定到底按照哪个threshold,执行回收策略。整个流程中有2个地方很关键,一个是回收节点资源(reclaimNodeLevelResources()),一个是驱逐user pod (m.evictPod() -> m.killPodFunc())。
3.3 m.reclaimNodeLevelResources()
回收节点资源相对易懂,把先前注册好的回收方法取出,挨个执行;结束后检查下节点summary是否在阈值以下了。
// pkg/kubelet/eviction/eviction_manager.go: 420
func (m *managerImpl) reclaimNodeLevelResources(
signalToReclaim evictionapi.Signal,
resourceToReclaim v1.ResourceName) bool {
nodeReclaimFuncs := m.signalToNodeReclaimFuncs[signalToReclaim]
for _, nodeReclaimFunc := range nodeReclaimFuncs {
// attempt to reclaim the pressured resource.
if err := nodeReclaimFunc(); err != nil {
klog.Warningf("eviction manager: unexpected error when attempting to reduce %v pressure: %v", resourceToReclaim, err)
}
}
if len(nodeReclaimFuncs) > 0 {
summary, err := m.summaryProvider.Get(true)
if err != nil {
klog.Errorf("eviction manager: failed to get summary stats after resource reclaim: %v", err)
return false
}
// make observations and get a function to derive pod usage stats relative to those observations.
observations, _ := makeSignalObservations(summary)
debugLogObservations("observations after resource reclaim", observations)
// determine the set of thresholds met independent of grace period
thresholds := thresholdsMet(m.config.Thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds after resource reclaim - ignoring grace period", thresholds, observations)
if len(thresholds) == 0 {
return true
}
}
return false
}
3.4 m.evictPod()
m.evictPod()调用的killPodFunc,这个方法在NewEvictionManager()时赋值为pkg/kubelet/pod_workers.go
的killPodNow()的返回值。调用链:m.evictPod()
=> m.killPodFunc() = killPodNow()的返回值
=> podWorkers.UpdatePod()
=>podWorkers.managePodLoop()
=> ppodWorkers.syncPodFn() = kubelet.syncPod()
。最终就是调用kubelet的syncPod()方法,把podPhase=Failed更新进去。
// pkg/kubelet/eviction/eviction_manager.go: 565
func (m *managerImpl) evictPod(
pod *v1.Pod,
gracePeriodOverride int64,
evictMsg string, annotations
map[string]string) bool {
...
err := m.killPodFunc(pod, status, &gracePeriodOverride)
...
}
// pkg/kubelet/pod_workers.go: 292
func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
return func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
...
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
OnCompleteFunc: func(err error) {
ch <- response{err: err}
},
KillPodOptions: &KillPodOptions{
PodStatusFunc: func(p *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
return status
},
PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
},
})
...
}
}
// pkg/kubelet/pod_workers.go: 200
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
...
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
...
}
// pkg/kubelet/pod_workers.go: 158
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
for update := range podUpdates {
...
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
...
}
...
}
// pkg/kubelet/kubelet.go: 1482
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
}