从源码理解kubelet pleg

2024-01-04  本文已影响0人  wwq2020

pleg相关代码

pkg/kubelet/pleg/generic.go中

构建pleg
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
    relistDuration *RelistDuration, cache kubecontainer.Cache,
    clock clock.Clock) PodLifecycleEventGenerator {
    return &GenericPLEG{
        relistDuration: relistDuration,
        runtime:        runtime,
        eventChannel:   eventChannel,
        podRecords:     make(podRecords),
        cache:          cache,
        clock:          clock,
    }
}

func (g *GenericPLEG) Start() {
    g.runningMu.Lock()
    defer g.runningMu.Unlock()
    if !g.isRunning {
        g.isRunning = true
        g.stopCh = make(chan struct{})
        go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
    }
}

从cri获取pod状态更新cache
func (g *GenericPLEG) Relist() {
...
从cri获取pod
    podList, err := g.runtime.GetPods(ctx, true)
    if err != nil {
        klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
        return
    }
...
更新relisttime
    g.updateRelistTime(timestamp)

...
计算pod event
    eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
    for pid := range g.podRecords {
        oldPod := g.podRecords.getOld(pid)
        pod := g.podRecords.getCurrent(pid)
        // Get all containers in the old and the new pod.
        allContainers := getContainersFromPods(oldPod, pod)
        for _, container := range allContainers {
            events := computeEvents(oldPod, pod, &container.ID)
            for _, e := range events {
                updateEvents(eventsByPodID, e)
            }
        }
    }
...
    for pid, events := range eventsByPodID {
...
更新缓存
                if err, _ := g.updateCache(ctx, pod, pid); err != nil {
...
发送pod事件给kubelet sync循环
        for i := range events {
            // Filter out events that are not reliable and no other components use yet.
            if events[i].Type == ContainerChanged {
                continue
            }
            select {
            case g.eventChannel <- events[i]:
            default:
                metrics.PLEGDiscardEvents.Inc()
                klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
            }
...
}
...

}


更新cache
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
...
    status, err := g.runtime.GetPodStatus(ctx, pod.ID, pod.Name, pod.Namespace)
...
    return err, g.cache.Set(pod.ID, status, err, timestamp)

}


是否健康(上次relist的时间到现在是否查过RelistThreshold,也就是3m)
func (g *GenericPLEG) Healthy() (bool, error) {
    relistTime := g.getRelistTime()
    if relistTime.IsZero() {
        return false, fmt.Errorf("pleg has yet to be successful")
    }
    // Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
    metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
    elapsed := g.clock.Since(relistTime)
    if elapsed > g.relistDuration.RelistThreshold {
        return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, g.relistDuration.RelistThreshold)
    }
    return true, nil
}



更新本次relist的时间
func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
    g.relistTime.Store(timestamp)
}

kubelet其余

    plegChannelCapacity = 1000

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIPs []net.IP,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    imageCredentialProviderConfigFile string,
    imageCredentialProviderBinDir string,
    registerNode bool,
    registerWithTaints []v1.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    registerSchedulable bool,
    keepTerminatedPodVolumes bool,
    nodeLabels map[string]string,
    nodeStatusMaxImages int32,
    seccompDefault bool,
) (*Kubelet, error) {
...
    eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)

...
        genericRelistDuration := &pleg.RelistDuration{
            RelistPeriod:    genericPlegRelistPeriod,
            RelistThreshold: genericPlegRelistThreshold,
        }
        klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
...
    klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)

    klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
    
...
}

sync 循环
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
    plegCh := kl.pleg.Watch()
...
        if err := kl.runtimeState.runtimeErrors(); err != nil {
            klog.ErrorS(err, "Skipping pod synchronization")
            // exponential backoff
            time.Sleep(duration)
            duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
            continue
        }
...
        if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
...
}

从各种事件源获取事件,执行sync操作
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
...
    case e := <-plegCh:
        if isSyncPodWorthy(e) {
            // PLEG event for a pod; sync it.
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                // If the pod no longer exists, ignore the event.
                klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
            }
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
...
}

pkg/kubelet/runtime.go中

func (s *runtimeState) runtimeErrors() error {
    s.RLock()
    defer s.RUnlock()
    errs := []error{}
    if s.lastBaseRuntimeSync.IsZero() {
        errs = append(errs, errors.New("container runtime status check may not have completed yet"))
    } else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
        errs = append(errs, errors.New("container runtime is down"))
    }
    for _, hc := range s.healthChecks {
        if ok, err := hc.fn(); !ok {
            errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
        }
    }
    if s.runtimeError != nil {
        errs = append(errs, s.runtimeError)
    }

    return utilerrors.NewAggregate(errs)
}

pkg/kubelet/kubelet_node_status.go中

pleg not healthy会导致node not ready
func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) error {
    setters = append(setters,
        nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
        nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
            kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent, kl.supportLocalStorageCapacityIsolation()),
        nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
        nodestatus.DaemonEndpoints(kl.daemonEndpoints),
        nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
        nodestatus.GoRuntime(),
    )
}

相关指标

kubelet_pleg_relist_duration_seconds pleg relist耗时(单位秒)
kubelet_pleg_discard_events pleg丢弃的事件数,可能sync loop处理过慢或事件过多,超过channel cap(1000)
kubelet_pleg_relist_interval_seconds relist间隔(单位秒)
kubelet_pleg_last_seen_seconds pleg上次活跃的时间戳(单位秒)
上一篇 下一篇

猜你喜欢

热点阅读