configmap更新后volume更新问题

2022-01-05  本文已影响0人  wwq2020

简单总结

kubelet对于成功sync的pod会重新添加回工作队列(SyncFrequency+SyncFrequency*随机小数(0-1))) (SyncFrequency默认1分钟)
kubelet每隔1秒会获取需要同步的pod,分发到podworker
desiredStateOfWorldPopulator会定期获取之前没有处理过的pod进行处理(0.1秒)
podworker会把desiredStateOfWorldPopulator中pod设置为未处理过

也就是说,configmap更新后,需要等待约SyncFrequency+SyncFrequency*随机小数(0-1)+0.1秒+1秒

相关代码

pkg/kubelet/apis/config/v1beta1/zz_generated.defaults.go中

func RegisterDefaults(scheme *runtime.Scheme) error {
    scheme.AddTypeDefaultingFunc(&v1beta1.KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*v1beta1.KubeletConfiguration)) })
    return nil
}

func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
    SetDefaults_KubeletConfiguration(in)
    for i := range in.ReservedMemory {
        a := &in.ReservedMemory[i]
        v1.SetDefaults_ResourceList(&a.Limits)
    }
}

pkg/kubelet/apis/config/v1beta1/defaults.go中

func addDefaultingFuncs(scheme *kruntime.Scheme) error {
    return RegisterDefaults(scheme)
}

func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfiguration) {
    if obj.EnableServer == nil {
        obj.EnableServer = utilpointer.BoolPtr(true)
    }
    if obj.SyncFrequency == zeroDuration {
        obj.SyncFrequency = metav1.Duration{Duration: 1 * time.Minute}
    }
    if obj.FileCheckFrequency == zeroDuration {
        obj.FileCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
    }
    if obj.HTTPCheckFrequency == zeroDuration {
        obj.HTTPCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
    }
    if obj.Address == "" {
        obj.Address = "0.0.0.0"
    }
    if obj.Port == 0 {
        obj.Port = ports.KubeletPort
    }
    if obj.Authentication.Anonymous.Enabled == nil {
        obj.Authentication.Anonymous.Enabled = utilpointer.BoolPtr(false)
    }
    if obj.Authentication.Webhook.Enabled == nil {
        obj.Authentication.Webhook.Enabled = utilpointer.BoolPtr(true)
    }
    if obj.Authentication.Webhook.CacheTTL == zeroDuration {
        obj.Authentication.Webhook.CacheTTL = metav1.Duration{Duration: 2 * time.Minute}
    }
    if obj.Authorization.Mode == "" {
        obj.Authorization.Mode = kubeletconfigv1beta1.KubeletAuthorizationModeWebhook
    }
    if obj.Authorization.Webhook.CacheAuthorizedTTL == zeroDuration {
        obj.Authorization.Webhook.CacheAuthorizedTTL = metav1.Duration{Duration: 5 * time.Minute}
    }
    if obj.Authorization.Webhook.CacheUnauthorizedTTL == zeroDuration {
        obj.Authorization.Webhook.CacheUnauthorizedTTL = metav1.Duration{Duration: 30 * time.Second}
    }
    if obj.RegistryPullQPS == nil {
        obj.RegistryPullQPS = utilpointer.Int32Ptr(5)
    }
    if obj.RegistryBurst == 0 {
        obj.RegistryBurst = 10
    }
    if obj.EventRecordQPS == nil {
        obj.EventRecordQPS = utilpointer.Int32Ptr(5)
    }
    if obj.EventBurst == 0 {
        obj.EventBurst = 10
    }
    if obj.EnableDebuggingHandlers == nil {
        obj.EnableDebuggingHandlers = utilpointer.BoolPtr(true)
    }
    if obj.HealthzPort == nil {
        obj.HealthzPort = utilpointer.Int32Ptr(10248)
    }
    if obj.HealthzBindAddress == "" {
        obj.HealthzBindAddress = "127.0.0.1"
    }
    if obj.OOMScoreAdj == nil {
        obj.OOMScoreAdj = utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj))
    }
    if obj.StreamingConnectionIdleTimeout == zeroDuration {
        obj.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 4 * time.Hour}
    }
    if obj.NodeStatusReportFrequency == zeroDuration {
        // For backward compatibility, NodeStatusReportFrequency's default value is
        // set to NodeStatusUpdateFrequency if NodeStatusUpdateFrequency is set
        // explicitly.
        if obj.NodeStatusUpdateFrequency == zeroDuration {
            obj.NodeStatusReportFrequency = metav1.Duration{Duration: 5 * time.Minute}
        } else {
            obj.NodeStatusReportFrequency = obj.NodeStatusUpdateFrequency
        }
    }
    if obj.NodeStatusUpdateFrequency == zeroDuration {
        obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
    }
    if obj.NodeLeaseDurationSeconds == 0 {
        obj.NodeLeaseDurationSeconds = 40
    }
    if obj.ImageMinimumGCAge == zeroDuration {
        obj.ImageMinimumGCAge = metav1.Duration{Duration: 2 * time.Minute}
    }
    if obj.ImageGCHighThresholdPercent == nil {
        // default is below docker's default dm.min_free_space of 90%
        obj.ImageGCHighThresholdPercent = utilpointer.Int32Ptr(85)
    }
    if obj.ImageGCLowThresholdPercent == nil {
        obj.ImageGCLowThresholdPercent = utilpointer.Int32Ptr(80)
    }
    if obj.VolumeStatsAggPeriod == zeroDuration {
        obj.VolumeStatsAggPeriod = metav1.Duration{Duration: time.Minute}
    }
    if obj.CgroupsPerQOS == nil {
        obj.CgroupsPerQOS = utilpointer.BoolPtr(true)
    }
    if obj.CgroupDriver == "" {
        obj.CgroupDriver = "cgroupfs"
    }
    if obj.CPUManagerPolicy == "" {
        obj.CPUManagerPolicy = "none"
    }
    if obj.CPUManagerReconcilePeriod == zeroDuration {
        // Keep the same as default NodeStatusUpdateFrequency
        obj.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second}
    }
    if obj.MemoryManagerPolicy == "" {
        obj.MemoryManagerPolicy = kubeletconfigv1beta1.NoneMemoryManagerPolicy
    }
    if obj.TopologyManagerPolicy == "" {
        obj.TopologyManagerPolicy = kubeletconfigv1beta1.NoneTopologyManagerPolicy
    }
    if obj.TopologyManagerScope == "" {
        obj.TopologyManagerScope = kubeletconfigv1beta1.ContainerTopologyManagerScope
    }
    if obj.RuntimeRequestTimeout == zeroDuration {
        obj.RuntimeRequestTimeout = metav1.Duration{Duration: 2 * time.Minute}
    }
    if obj.HairpinMode == "" {
        obj.HairpinMode = kubeletconfigv1beta1.PromiscuousBridge
    }
    if obj.MaxPods == 0 {
        obj.MaxPods = 110
    }
    // default nil or negative value to -1 (implies node allocatable pid limit)
    if obj.PodPidsLimit == nil || *obj.PodPidsLimit < int64(0) {
        obj.PodPidsLimit = utilpointer.Int64(-1)
    }

    if obj.ResolverConfig == nil {
        obj.ResolverConfig = utilpointer.String(kubetypes.ResolvConfDefault)
    }
    if obj.CPUCFSQuota == nil {
        obj.CPUCFSQuota = utilpointer.BoolPtr(true)
    }
    if obj.CPUCFSQuotaPeriod == nil {
        obj.CPUCFSQuotaPeriod = &metav1.Duration{Duration: 100 * time.Millisecond}
    }
    if obj.NodeStatusMaxImages == nil {
        obj.NodeStatusMaxImages = utilpointer.Int32Ptr(50)
    }
    if obj.MaxOpenFiles == 0 {
        obj.MaxOpenFiles = 1000000
    }
    if obj.ContentType == "" {
        obj.ContentType = "application/vnd.kubernetes.protobuf"
    }
    if obj.KubeAPIQPS == nil {
        obj.KubeAPIQPS = utilpointer.Int32Ptr(5)
    }
    if obj.KubeAPIBurst == 0 {
        obj.KubeAPIBurst = 10
    }
    if obj.SerializeImagePulls == nil {
        obj.SerializeImagePulls = utilpointer.BoolPtr(true)
    }
    if obj.EvictionHard == nil {
        obj.EvictionHard = DefaultEvictionHard
    }
    if obj.EvictionPressureTransitionPeriod == zeroDuration {
        obj.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 5 * time.Minute}
    }
    if obj.EnableControllerAttachDetach == nil {
        obj.EnableControllerAttachDetach = utilpointer.BoolPtr(true)
    }
    if obj.MakeIPTablesUtilChains == nil {
        obj.MakeIPTablesUtilChains = utilpointer.BoolPtr(true)
    }
    if obj.IPTablesMasqueradeBit == nil {
        obj.IPTablesMasqueradeBit = utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit)
    }
    if obj.IPTablesDropBit == nil {
        obj.IPTablesDropBit = utilpointer.Int32Ptr(DefaultIPTablesDropBit)
    }
    if obj.FailSwapOn == nil {
        obj.FailSwapOn = utilpointer.BoolPtr(true)
    }
    if obj.ContainerLogMaxSize == "" {
        obj.ContainerLogMaxSize = "10Mi"
    }
    if obj.ContainerLogMaxFiles == nil {
        obj.ContainerLogMaxFiles = utilpointer.Int32Ptr(5)
    }
    if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
        obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy
    }
    if obj.EnforceNodeAllocatable == nil {
        obj.EnforceNodeAllocatable = DefaultNodeAllocatableEnforcement
    }
    if obj.VolumePluginDir == "" {
        obj.VolumePluginDir = DefaultVolumePluginDir
    }
    // Use the Default LoggingConfiguration option
    componentbaseconfigv1alpha1.RecommendedLoggingConfiguration(&obj.Logging)
    if obj.EnableSystemLogHandler == nil {
        obj.EnableSystemLogHandler = utilpointer.BoolPtr(true)
    }
    if obj.EnableProfilingHandler == nil {
        obj.EnableProfilingHandler = utilpointer.BoolPtr(true)
    }
    if obj.EnableDebugFlagsHandler == nil {
        obj.EnableDebugFlagsHandler = utilpointer.BoolPtr(true)
    }
    if obj.SeccompDefault == nil {
        obj.SeccompDefault = utilpointer.BoolPtr(false)
    }
    if obj.MemoryThrottlingFactor == nil {
        obj.MemoryThrottlingFactor = utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor)
    }
}

pkg/kubelet/apis/config/v1beta1/register.go中

func init() {
    localSchemeBuilder.Register(addDefaultingFuncs)
}

pkg/kubelet/apis/config/scheme/scheme.go中

func NewSchemeAndCodecs(mutators ...serializer.CodecFactoryOptionsMutator) (*runtime.Scheme, *serializer.CodecFactory, error) {
    scheme := runtime.NewScheme()
    if err := kubeletconfig.AddToScheme(scheme); err != nil {
        return nil, nil, err
    }
    if err := kubeletconfigv1beta1.AddToScheme(scheme); err != nil {
        return nil, nil, err
    }
    codecs := serializer.NewCodecFactory(scheme, mutators...)
    return scheme, &codecs, nil
}

cmd/kubelet/app/options/options.go中

func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfiguration) {
    ...
    fs.DurationVar(&c.SyncFrequency.Duration, "sync-frequency", c.SyncFrequency.Duration, "Max period between synchronizing running containers and config")
    ...
}

func NewKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
    scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
    if err != nil {
        return nil, err
    }
    versioned := &v1beta1.KubeletConfiguration{}
    scheme.Default(versioned)
    config := &kubeletconfig.KubeletConfiguration{}
    if err := scheme.Convert(versioned, config, nil); err != nil {
        return nil, err
    }
    applyLegacyDefaults(config)
    return config, nil
}

cmd/kubelet/app/server.go中

func NewKubeletCommand() *cobra.Command {
    ...
    kubeletConfig, err := options.NewKubeletConfiguration()
    ...
    if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
        ...
    }
    ...
    options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
}

func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
    logOption := &logs.Options{Config: s.Logging}
    logOption.Apply()
    // To help debugging, immediately log version
    klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
    if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
        return fmt.Errorf("failed OS init: %w", err)
    }
    if err := run(ctx, s, kubeDeps, featureGate); err != nil {
        return fmt.Errorf("failed to run Kubelet: %w", err)
    }
    return nil
}

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {

    ...
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }
    ...
}

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ...
    k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
        kubeDeps,
        &kubeServer.ContainerRuntimeOptions,
        kubeServer.ContainerRuntime,
        hostname,
        hostnameOverridden,
        nodeName,
        nodeIPs,
        kubeServer.ProviderID,
        kubeServer.CloudProvider,
        kubeServer.CertDirectory,
        kubeServer.RootDirectory,
        kubeServer.ImageCredentialProviderConfigFile,
        kubeServer.ImageCredentialProviderBinDir,
        kubeServer.RegisterNode,
        kubeServer.RegisterWithTaints,
        kubeServer.AllowedUnsafeSysctls,
        kubeServer.ExperimentalMounterPath,
        kubeServer.KernelMemcgNotification,
        kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
        kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
        kubeServer.MinimumGCAge,
        kubeServer.MaxPerPodContainerCount,
        kubeServer.MaxContainerCount,
        kubeServer.MasterServiceNamespace,
        kubeServer.RegisterSchedulable,
        kubeServer.KeepTerminatedPodVolumes,
        kubeServer.NodeLabels,
        kubeServer.NodeStatusMaxImages,
        kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
    )
    ...
}
func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *kubelet.Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    containerRuntime string,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIPs []net.IP,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    imageCredentialProviderConfigFile string,
    imageCredentialProviderBinDir string,
    registerNode bool,
    registerWithTaints []api.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalCheckNodeCapabilitiesBeforeMount bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    masterServiceNamespace string,
    registerSchedulable bool,
    keepTerminatedPodVolumes bool,
    nodeLabels map[string]string,
    nodeStatusMaxImages int32,
    seccompDefault bool,
) (k kubelet.Bootstrap, err error) {
    ...
    k, err = kubelet.NewMainKubelet(kubeCfg,
        kubeDeps,
        crOptions,
        containerRuntime,
        hostname,
        hostnameOverridden,
        nodeName,
        nodeIPs,
        providerID,
        cloudProvider,
        certDirectory,
        rootDirectory,
        imageCredentialProviderConfigFile,
        imageCredentialProviderBinDir,
        registerNode,
        registerWithTaints,
        allowedUnsafeSysctls,
        experimentalMounterPath,
        kernelMemcgNotification,
        experimentalCheckNodeCapabilitiesBeforeMount,
        experimentalNodeAllocatableIgnoreEvictionThreshold,
        minimumGCAge,
        maxPerPodContainerCount,
        maxContainerCount,
        masterServiceNamespace,
        registerSchedulable,
        keepTerminatedPodVolumes,
        nodeLabels,
        nodeStatusMaxImages,
        seccompDefault,
    )
    ...
}

pkg/kubelet/kubelet.go中

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    containerRuntime string,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIPs []net.IP,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    imageCredentialProviderConfigFile string,
    imageCredentialProviderBinDir string,
    registerNode bool,
    registerWithTaints []api.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalCheckNodeCapabilitiesBeforeMount bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    masterServiceNamespace string,
    registerSchedulable bool,
    keepTerminatedPodVolumes bool,
    nodeLabels map[string]string,
    nodeStatusMaxImages int32,
    seccompDefault bool,
) (*Kubelet, error) {
    ...
    klet := &Kubelet{
        ...
        resyncInterval:                          kubeCfg.SyncFrequency.Duration,
        ...
    }
    ...
    activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
    if err != nil {
        return nil, err
    }
    klet.AddPodSyncLoopHandler(activeDeadlineHandler)
    ...
    klet.podWorkers = newPodWorkers(
        klet.syncPod,
        klet.syncTerminatingPod,
        klet.syncTerminatedPod,

        kubeDeps.Recorder,
        klet.workQueue,
        klet.resyncInterval,
        backOffPeriod,
        klet.podCache,
    )
    ...
    klet.volumeManager = volumemanager.NewVolumeManager(
        kubeCfg.EnableControllerAttachDetach,
        nodeName,
        klet.podManager,
        klet.podWorkers,
        klet.kubeClient,
        klet.volumePluginMgr,
        klet.containerRuntime,
        kubeDeps.Mounter,
        kubeDeps.HostUtil,
        klet.getPodsDir(),
        kubeDeps.Recorder,
        experimentalCheckNodeCapabilitiesBeforeMount,
        keepTerminatedPodVolumes,
        volumepathhandler.NewBlockVolumePathHandler())
}

func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
    ...
    if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
        ...
    }
    ...
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ...
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
    ...
    kl.syncLoop(updates, kl)
}

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    ...
    syncTicker := time.NewTicker(time.Second)
    ...
    for {
        ...
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        ...
    }
}

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case <-syncCh:
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
            break
        }
        handler.HandlePodSyncs(podsToSync)
    }
}

func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
    }
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        StartTime:  start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

func (kl *Kubelet) getPodsToSync() []*v1.Pod {
    allPods := kl.podManager.GetPods()
    podUIDs := kl.workQueue.GetWork()
    podUIDSet := sets.NewString()
    for _, podUID := range podUIDs {
        podUIDSet.Insert(string(podUID))
    }
    var podsToSync []*v1.Pod
    for _, pod := range allPods {
        if podUIDSet.Has(string(pod.UID)) {
            // The work of the pod is ready
            podsToSync = append(podsToSync, pod)
            continue
        }
        for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
            if podSyncLoopHandler.ShouldSync(pod) {
                podsToSync = append(podsToSync, pod)
                break
            }
        }
    }
    return podsToSync
}

pkg/kubelet/active_deadline.go中

func newActiveDeadlineHandler(
    podStatusProvider status.PodStatusProvider,
    recorder record.EventRecorder,
    clock clock.Clock,
) (*activeDeadlineHandler, error) {

    // check for all required fields
    if clock == nil || podStatusProvider == nil || recorder == nil {
        return nil, fmt.Errorf("required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
    }
    return &activeDeadlineHandler{
        clock:             clock,
        podStatusProvider: podStatusProvider,
        recorder:          recorder,
    }, nil
}

func (m *activeDeadlineHandler) ShouldSync(pod *v1.Pod) bool {
    return m.pastActiveDeadline(pod)
}

func (m *activeDeadlineHandler) pastActiveDeadline(pod *v1.Pod) bool {
    // no active deadline was specified
    if pod.Spec.ActiveDeadlineSeconds == nil {
        return false
    }
    // get the latest status to determine if it was started
    podStatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
    if !ok {
        podStatus = pod.Status
    }
    // we have no start time so just return
    if podStatus.StartTime.IsZero() {
        return false
    }
    // determine if the deadline was exceeded
    start := podStatus.StartTime.Time
    duration := m.clock.Since(start)
    allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
    return duration >= allowedDuration
}
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
    ...
    p.managePodLoop(podUpdates)
    ...
}

func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
    ...
    err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
    ...
    p.completeWork(pod, err)
    ...
}

func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
    ...
    p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
    ...
}

pkg/kubelet/volumemanager/volume_manager.go中

const(
    ...
    desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
    ...
)

func NewVolumeManager(
    controllerAttachDetachEnabled bool,
    nodeName k8stypes.NodeName,
    podManager pod.Manager,
    podStateProvider podStateProvider,
    kubeClient clientset.Interface,
    volumePluginMgr *volume.VolumePluginMgr,
    kubeContainerRuntime container.Runtime,
    mounter mount.Interface,
    hostutil hostutil.HostUtils,
    kubeletPodsDir string,
    recorder record.EventRecorder,
    checkNodeCapabilitiesBeforeMount bool,
    keepTerminatedPodVolumes bool,
    blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
    ...
    vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
        kubeClient,
        desiredStateOfWorldPopulatorLoopSleepPeriod,
        desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
        podManager,
        podStateProvider,
        vm.desiredStateOfWorld,
        vm.actualStateOfWorld,
        kubeContainerRuntime,
        keepTerminatedPodVolumes,
        csiMigratedPluginManager,
        intreeToCSITranslator,
        volumePluginMgr)
    ...
}

func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    ...
    go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    ...
}

func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
    if pod == nil {
        return nil
    }

    expectedVolumes := getExpectedVolumes(pod)
    if len(expectedVolumes) == 0 {
        // No volumes to verify
        return nil
    }

    klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
    uniquePodName := util.GetUniquePodName(pod)

    // Some pods expect to have Setup called over and over again to update.
    // Remount plugins for which this is true. (Atomically updating volumes,
    // like Downward API, depend on this to update the contents of the volume).
    vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)

    err := wait.PollImmediate(
        podAttachAndMountRetryInterval,
        podAttachAndMountTimeout,
        vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))

    if err != nil {
        unmountedVolumes :=
            vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
        // Also get unattached volumes for error message
        unattachedVolumes :=
            vm.getUnattachedVolumes(expectedVolumes)

        if len(unmountedVolumes) == 0 {
            return nil
        }

        return fmt.Errorf(
            "unmounted volumes=%v, unattached volumes=%v: %s",
            unmountedVolumes,
            unattachedVolumes,
            err)
    }

    klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
    return nil
}

func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
    return func() (done bool, err error) {
        if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
            return true, errors.New(strings.Join(errs, "; "))
        }
        return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
    }
}

func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
    mountedVolumes := sets.NewString()
    for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
        mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
    }
    return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
}

pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中

func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    ...
    wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
    ...
    dswp.findAndAddNewPods()
    ...
}

func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
    ...
    dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
    ...
}

func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
    pod *v1.Pod,
    mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
    processedVolumesForFSResize sets.String) {
    ...
    if dswp.podPreviouslyProcessed(uniquePodName) {
        return
    }
    ...
}

func (dswp *desiredStateOfWorldPopulator) podPreviouslyProcessed(
    podName volumetypes.UniquePodName) bool {
    dswp.pods.RLock()
    defer dswp.pods.RUnlock()

    return dswp.pods.processedPods[podName]
}

func (dswp *desiredStateOfWorldPopulator) markPodProcessingFailed(
    podName volumetypes.UniquePodName) {
    dswp.pods.Lock()
    dswp.pods.processedPods[podName] = false
    dswp.pods.Unlock()
}

func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
    podName volumetypes.UniquePodName) {
    dswp.markPodProcessingFailed(podName)
}
上一篇下一篇

猜你喜欢

热点阅读