如何优雅滚动更新Pod

2021-09-26  本文已影响0人  wwq2020

背景

触发滚动更新时候,会删除pod,会访问apiserver的接口,此时会设置tDeletionTimestamp
相关代码如下

k8s.io/apiserver/pkg/storage/etcd3/store.go

func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
    versioner := APIObjectVersioner{}
    result := &store{
        client:        c,
        codec:         codec,
        versioner:     versioner,
        transformer:   transformer,
        pagingEnabled: pagingEnabled,
        // for compatibility with etcd2 impl.
        // no-op for default prefix of '/registry'.
        // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
        pathPrefix:   path.Join("/", prefix),
        watcher:      newWatcher(c, codec, newFunc, versioner, transformer),
        leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
    }
    return result
}

func (s *store) Delete(
    ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
    validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
    v, err := conversion.EnforcePtr(out)
    if err != nil {
        return fmt.Errorf("unable to convert output object to pointer: %v", err)
    }
    key = path.Join(s.pathPrefix, key)
    return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion, cachedExistingObject)
}

func (s *store) conditionalDelete(
    ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
    validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
    getCurrentState := func() (*objState, error) {
        startTime := time.Now()
        getResp, err := s.client.KV.Get(ctx, key)
        metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
        if err != nil {
            return nil, err
        }
        return s.getState(getResp, key, v, false)
    }

    var origState *objState
    var err error
    var origStateIsCurrent bool
    if cachedExistingObject != nil {
        origState, err = s.getStateFromObject(cachedExistingObject)
    } else {
        origState, err = getCurrentState()
        origStateIsCurrent = true
    }
    if err != nil {
        return err
    }

    for {
        if preconditions != nil {
            if err := preconditions.Check(key, origState.obj); err != nil {
                if origStateIsCurrent {
                    return err
                }

                // It's possible we're working with stale data.
                // Remember the revision of the potentially stale data and the resulting update error
                cachedRev := origState.rev
                cachedUpdateErr := err

                // Actually fetch
                origState, err = getCurrentState()
                if err != nil {
                    return err
                }
                origStateIsCurrent = true

                // it turns out our cached data was not stale, return the error
                if cachedRev == origState.rev {
                    return cachedUpdateErr
                }

                // Retry
                continue
            }
        }
        if err := validateDeletion(ctx, origState.obj); err != nil {
            if origStateIsCurrent {
                return err
            }

            // It's possible we're working with stale data.
            // Remember the revision of the potentially stale data and the resulting update error
            cachedRev := origState.rev
            cachedUpdateErr := err

            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            origStateIsCurrent = true

            // it turns out our cached data was not stale, return the error
            if cachedRev == origState.rev {
                return cachedUpdateErr
            }

            // Retry
            continue
        }

        startTime := time.Now()
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
        ).Then(
            clientv3.OpDelete(key),
        ).Else(
            clientv3.OpGet(key),
        ).Commit()
        metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
        if err != nil {
            return err
        }
        if !txnResp.Succeeded {
            getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
            origState, err = s.getState(getResp, key, v, false)
            if err != nil {
                return err
            }
            origStateIsCurrent = true
            continue
        }
        return decode(s.codec, s.versioner, origState.data, out, origState.rev)
    }
}

k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
    stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
    if err != nil {
        return nil, nil, err
    }

    client, err := newETCD3Client(c.Transport)
    if err != nil {
        stopCompactor()
        return nil, nil, err
    }

    stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
    if err != nil {
        return nil, nil, err
    }

    var once sync.Once
    destroyFunc := func() {
        // we know that storage destroy funcs are called multiple times (due to reuse in subresources).
        // Hence, we only destroy once.
        // TODO: fix duplicated storage destroy calls higher level
        once.Do(func() {
            stopCompactor()
            stopDBSizeMonitor()
            client.Close()
        })
    }
    transformer := c.Transformer
    if transformer == nil {
        transformer = value.IdentityTransformer
    }
    return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}

k8s.io/apiserver/pkg/storage/cacher/cacher.go

func NewCacherFromConfig(config Config) (*Cacher, error) {
    ...
    cacher := &Cacher{
        ready:          newReady(),
        storage:        config.Storage,
        objectType:     objType,
        versioner:      config.Versioner,
        newFunc:        config.NewFunc,
        indexedTrigger: indexedTrigger,
        watcherIdx:     0,
        watchers: indexedWatchers{
            allWatchers:   make(map[int]*cacheWatcher),
            valueWatchers: make(map[string]watchersMap),
        },
        // TODO: Figure out the correct value for the buffer size.
        incoming:              make(chan watchCacheEvent, 100),
        dispatchTimeoutBudget: newTimeBudget(),
        // We need to (potentially) stop both:
        // - wait.Until go-routine
        // - reflector.ListAndWatch
        // and there are no guarantees on the order that they will stop.
        // So we will be simply closing the channel, and synchronizing on the WaitGroup.
        stopCh:           stopCh,
        clock:            config.Clock,
        timer:            time.NewTimer(time.Duration(0)),
        bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
    }

    ...
    return cacher, nil
}

func (c *Cacher) Delete(
    ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
    validateDeletion storage.ValidateObjectFunc, _ runtime.Object) error {
    // Ignore the suggestion and try to pass down the current version of the object
    // read from cache.
    if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
        klog.Errorf("GetByKey returned error: %v", err)
    } else if exists {
        // DeepCopy the object since we modify resource version when serializing the
        // current object.
        currObj := elem.(*storeElement).Object.DeepCopyObject()
        return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj)
    }
    // If we couldn't get the object, fallback to no-suggestion.
    return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
}

k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

func StorageWithCacher() generic.StorageDecorator {
    return func(
        storageConfig *storagebackend.ConfigForResource,
        resourcePrefix string,
        keyFunc func(obj runtime.Object) (string, error),
        newFunc func() runtime.Object,
        newListFunc func() runtime.Object,
        getAttrsFunc storage.AttrFunc,
        triggerFuncs storage.IndexerFuncs,
        indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {

        s, d, err := generic.NewRawStorage(storageConfig, newFunc)
        if err != nil {
            return s, d, err
        }
        if klog.V(5).Enabled() {
            klog.InfoS("Storage caching is enabled", objectTypeToArgs(newFunc())...)
        }

        cacherConfig := cacherstorage.Config{
            Storage:        s,
            Versioner:      etcd3.APIObjectVersioner{},
            ResourcePrefix: resourcePrefix,
            KeyFunc:        keyFunc,
            NewFunc:        newFunc,
            NewListFunc:    newListFunc,
            GetAttrsFunc:   getAttrsFunc,
            IndexerFuncs:   triggerFuncs,
            Indexers:       indexers,
            Codec:          storageConfig.Codec,
        }
        cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
        if err != nil {
            return nil, func() {}, err
        }
        destroyFunc := func() {
            cacher.Stop()
            d()
        }

        // TODO : Remove RegisterStorageCleanup below when PR
        // https://github.com/kubernetes/kubernetes/pull/50690
        // merges as that shuts down storage properly
        RegisterStorageCleanup(destroyFunc)

        return cacher, destroyFunc, nil
    }
}

k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go

func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case storagebackend.StorageTypeETCD2:
        return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
        return newETCD3Storage(c, newFunc)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}

k8s.io/apiserver/pkg/registry/generic/storage_decorator.go

func UndecoratedStorage(
    config *storagebackend.ConfigForResource,
    resourcePrefix string,
    keyFunc func(obj runtime.Object) (string, error),
    newFunc func() runtime.Object,
    newListFunc func() runtime.Object,
    getAttrsFunc storage.AttrFunc,
    trigger storage.IndexerFuncs,
    indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
    return NewRawStorage(config, newFunc)
}


func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
    return factory.Create(*config, newFunc)
}

k8s.io/apiserver/pkg/server/options/etcd.go

func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
    if err := s.addEtcdHealthEndpoint(c); err != nil {
        return err
    }

    // use the StorageObjectCountTracker interface instance from server.Config
    s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker

    c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
    return nil
}

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }

    ret := generic.RESTOptions{
        StorageConfig:             storageConfig,
        Decorator:                 generic.UndecoratedStorage,
        DeleteCollectionWorkers:   f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection:   f.Options.EnableGarbageCollection,
        ResourcePrefix:            f.StorageFactory.ResourcePrefix(resource),
        CountMetricPollPeriod:     f.Options.StorageConfig.CountMetricPollPeriod,
        StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        size, ok := sizes[resource]
        if ok && size > 0 {
            klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
        }
        if ok && size <= 0 {
            ret.Decorator = generic.UndecoratedStorage
        } else {
            ret.Decorator = genericregistry.StorageWithCacher()
        }
    }

    return ret, nil
}

cmd/kube-apiserver/app/server.go

func buildGenericConfig(
    s *options.ServerRunOptions,
    proxyTransport *http.Transport,
) (
    genericConfig *genericapiserver.Config,
    versionedInformers clientgoinformers.SharedInformerFactory,
    serviceResolver aggregatorapiserver.ServiceResolver,
    pluginInitializers []admission.PluginInitializer,
    admissionPostStartHook genericapiserver.PostStartHookFunc,
    storageFactory *serverstorage.DefaultStorageFactory,
    lastErr error,
) {
    ...
    genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
        return
    }
    ...
}

func CreateKubeAPIServerConfig(s completedServerRunOptions) (
    ...
    genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
    if err != nil {
        return nil, nil, nil, err
    }
    ...
    config := &controlplane.Config{
        GenericConfig: genericConfig,
    ...
}

func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
    if err != nil {
        return nil, err
    }
    ...
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
    if err != nil {
        return nil, err
    }
    ...
    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
    if err != nil {
        return nil, err
    }
    ...
}

func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    return kubeAPIServer, nil
}

pkg/controlplane/instance.go中

func (c *Config) Complete() CompletedConfig {
    cfg := completedConfig{
        c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
        &c.ExtraConfig,
    }

    serviceIPRange, apiServerServiceIP, err := ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
    if err != nil {
        klog.Fatalf("Error determining service IP ranges: %v", err)
    }
    if cfg.ExtraConfig.ServiceIPRange.IP == nil {
        cfg.ExtraConfig.ServiceIPRange = serviceIPRange
    }
    if cfg.ExtraConfig.APIServerServiceIP == nil {
        cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
    }

    discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
    discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
        discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
    cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses

    if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
        // TODO: Currently no way to specify an empty range (do we need to allow this?)
        // We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
        // but then that breaks the strict nestedness of ServiceType.
        // Review post-v1
        cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
        klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
    }

    if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
        cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
    }

    if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
        cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
    }

    if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
        cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
    }

    return CompletedConfig{&cfg}
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
    ...
    if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:              c.ExtraConfig.StorageFactory,
            ProxyTransport:              c.ExtraConfig.ProxyTransport,
            KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
            EventTTL:                    c.ExtraConfig.EventTTL,
            ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
            SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
            ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
            LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
            ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
            ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
            ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
            APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
        }
        if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
            return nil, err
        }
    }

    restStorageProviders := []RESTStorageProvider{
        apiserverinternalrest.StorageProvider{},
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
        authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
        autoscalingrest.RESTStorageProvider{},
        batchrest.RESTStorageProvider{},
        certificatesrest.RESTStorageProvider{},
        coordinationrest.RESTStorageProvider{},
        discoveryrest.StorageProvider{},
        networkingrest.RESTStorageProvider{},
        noderest.RESTStorageProvider{},
        policyrest.RESTStorageProvider{},
        rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
        schedulingrest.RESTStorageProvider{},
        storagerest.RESTStorageProvider{},
        flowcontrolrest.RESTStorageProvider{},
        // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
        // See https://github.com/kubernetes/kubernetes/issues/42392
        appsrest.StorageProvider{},
        admissionregistrationrest.RESTStorageProvider{},
        eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }
    if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
        return nil, err
    }
    ...
}

func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("error building core storage: %v", err)
    }

    controllerName := "bootstrap-controller"
    coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("error in registering group versions: %v", err)
    }
    return nil
}

func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
    apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
    ...
    for _, restStorageBuilder := range restStorageProviders {
        ...
        apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
        if err != nil {
            return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
        }
        ...
        apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
    }
    if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
        return fmt.Errorf("error in registering group versions: %v", err)
    }
    ...
}

pkg/registry/core/rest/storage_core.go中

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    ...
    podStorage, err := podstore.NewStorage(
        restOptionsGetter,
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )
    ...
    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
    ...
}

pkg/registry/core/pod/storage/storage.go

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &api.Pod{} },
        NewListFunc:              func() runtime.Object { return &api.PodList{} },
        PredicateFunc:            registrypod.MatchPod,
        DefaultQualifiedResource: api.Resource("pods"),

        CreateStrategy:      registrypod.Strategy,
        UpdateStrategy:      registrypod.Strategy,
        DeleteStrategy:      registrypod.Strategy,
        ResetFieldsStrategy: registrypod.Strategy,
        ReturnDeletedObject: true,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{
        RESTOptions: optsGetter,
        AttrFunc:    registrypod.GetAttrs,
        TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
        Indexers:    registrypod.Indexers(),
    }
    if err := store.CompleteWithOptions(options); err != nil {
        return PodStorage{}, err
    }
    ...
    return PodStorage{
        Pod:                 &REST{store, proxyTransport},
        Binding:             &BindingREST{store: store},
        LegacyBinding:       &LegacyBindingREST{bindingREST},
        Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
        Status:              &StatusREST{store: &statusStore},
        EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
        Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }, nil
}

k8s.io/apiserver/pkg/registry/generic/registry/store.go

func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    ...
    if e.Storage.Storage == nil {
        e.Storage.Codec = opts.StorageConfig.Codec
        var err error
        e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
            opts.StorageConfig,
            prefix,
            keyFunc,
            e.NewFunc,
            e.NewListFunc,
            attrFunc,
            options.TriggerFunc,
            options.Indexers,
        )
        if err != nil {
            return err
        }
        e.StorageVersioner = opts.StorageConfig.EncodeVersioner

        if opts.CountMetricPollPeriod > 0 {
            stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
            previousDestroy := e.DestroyFunc
            e.DestroyFunc = func() {
                stopFunc()
                if previousDestroy != nil {
                    previousDestroy()
                }
            }
        }
    }
    ...
}
func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
    ...
    graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
    if err != nil {
        return nil, false, err
    }
    ...
    if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil); err != nil {
        // Please refer to the place where we set ignoreNotFound for the reason
        // why we ignore the NotFound error .
        if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
            // The lastExisting object may not be the last state of the object
            // before its deletion, but it's the best approximation.
            out, err := e.finalizeDelete(ctx, lastExisting, true, options)
            return out, true, err
        }
        return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
    }
    ...
}

k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go

func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool, cachedExistingObject runtime.Object) error {
    if dryRun {
        if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
            return err
        }
        if err := preconditions.Check(key, out); err != nil {
            return err
        }
        return deleteValidation(ctx, out)
    }
    return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation, cachedExistingObject)
}

k8s.io/apiserver/pkg/registry/rest/delete.go

func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
    ...
    if objectMeta.GetDeletionTimestamp() != nil {
        // if we are already being deleted, we may only shorten the deletion grace period
        // this means the object was gracefully deleted previously but deletionGracePeriodSeconds was not set,
        // so we force deletion immediately
        // IMPORTANT:
        // The deletion operation happens in two phases.
        // 1. Update to set DeletionGracePeriodSeconds and DeletionTimestamp
        // 2. Delete the object from storage.
        // If the update succeeds, but the delete fails (network error, internal storage error, etc.),
        // a resource was previously left in a state that was non-recoverable.  We
        // check if the existing stored resource has a grace period as 0 and if so
        // attempt to delete immediately in order to recover from this scenario.
        if objectMeta.GetDeletionGracePeriodSeconds() == nil || *objectMeta.GetDeletionGracePeriodSeconds() == 0 {
            return false, false, nil
        }
    ...
    objectMeta.SetDeletionTimestamp(&now)
    objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
    ...
}

k8s.io/apiserver/pkg/server/genericapiserver.go中

func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
    if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
        return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
    }

    openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
    if err != nil {
        return fmt.Errorf("unable to get openapi models: %v", err)
    }

    if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
        return err
    }

    // Install the version handler.
    // Add a handler at /<apiPrefix> to enumerate the supported api versions.
    s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())

    return nil
}

func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
    ...
    for _, apiGroupInfo := range apiGroupInfos {
        if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
            return fmt.Errorf("unable to install api resources: %v", err)
        }
    ...
}

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
    ...
    apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
    ...
    r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
    ...
}

func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
    storage := make(map[string]rest.Storage)
    for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
        storage[strings.ToLower(k)] = v
    }
    version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
    version.Root = apiPrefix
    version.Storage = storage
    return version
}

k8s.io/apiserver/pkg/endpoints/groupversion.go中

func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
    ...
    installer := &APIInstaller{
        group:             g,
        prefix:            prefix,
        minRequestTimeout: g.MinRequestTimeout,
    }

    apiResources, resourceInfos, ws, registrationErrors := installer.Install()
    ...
}

k8s.io/apiserver/pkg/endpoints/installer.go中

func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {

    ...
    apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
    ...
}

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
    ...
    case "DELETE": // Delete a resource.
        ...
        handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
        handler = utilwarning.AddWarningsHandler(handler, warnings)
        route := ws.DELETE(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Writes(deleteReturnType).
                Returns(http.StatusOK, "OK", deleteReturnType).
                Returns(http.StatusAccepted, "Accepted", deleteReturnType)
        ...
    ...
}

func restfulDeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.DeleteResource(r, allowsOptions, &scope, admit)(res.ResponseWriter, req.Request)
    }
}

func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
    ...
    result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
            obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
            wasDeleted = deleted
            return obj, err
        })
    ...
}

kubelet收到pod更新事件,发现Pod设置了DeletionTimestamp,会走删除逻辑,先执行prestop,然后停止容器,然后调用apiserver将GracePeriodSeconds设置为0表示这个pod可以删除了,apiserver执行删除

问题

这里面pod删除关注的使用方有endpointcontroller以及kubelet执行pod删除,也就是endpoint的更新和pod删除是并行执行的,那么有可能Pod删除了,endpoint任然没有更新,会影响kube-proxy和ingresscontroller的使用,就算是endpoint更新了,kube-proxy和ingresscontroller收到事件并且处理完,还是可能在pod删除之后,会导致流量损失

解决方式

方案一
加入prestop,如sleep 5秒,但是只能减少这个问题出现的可能性,没有从根本上解决问题
方案二
使用admissionwebhook,在删除pod时候,做一些清理,比如停止网关把流量转发到此pod
方案三
使用download api注入podip环境变量,prestop通知网关此podip要下线

上一篇下一篇

猜你喜欢

热点阅读