k8s 之 garbagecollector controlle

2020-07-21  本文已影响0人  wwq2020

简介

garbagecollector controller 从 apiserver 获取可删除资源,然后监控他们的变化,然后构建依赖图,删除所有者已经不存在的资源

源码

cmd/kube-controller-manager/app/core.go 中

func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
        return nil, false, nil
    }

    gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")

    config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
    metadataClient, err := metadata.NewForConfig(config)
    if err != nil {
        return nil, true, err
    }

    ignoredResources := make(map[schema.GroupResource]struct{})
    for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
        ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
    }
    garbageCollector, err := garbagecollector.NewGarbageCollector(
        metadataClient,
        ctx.RESTMapper,
        ignoredResources,
        ctx.ObjectOrMetadataInformerFactory,
        ctx.InformersStarted,
    )
    if err != nil {
        return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
    }

    // Start the garbage collector.
    workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
    go garbageCollector.Run(workers, ctx.Stop)

    // Periodically refresh the RESTMapper with new discovery information and sync
    // the garbage collector.
    go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)

    return garbagecollector.NewDebugHandler(garbageCollector), true, nil
}

pkg/controller/garbagecollector/garbagecollector.go 中

func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer gc.attemptToDelete.ShutDown()
    defer gc.attemptToOrphan.ShutDown()
    defer gc.dependencyGraphBuilder.graphChanges.ShutDown()

    klog.Infof("Starting garbage collector controller")
    defer klog.Infof("Shutting down garbage collector controller")

    go gc.dependencyGraphBuilder.Run(stopCh)

    if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
        return
    }

    klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")

    // gc workers
    for i := 0; i < workers; i++ {
        go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
        go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
    }

    <-stopCh
}

func (gc *GarbageCollector) runAttemptToDeleteWorker() {
    for gc.attemptToDeleteWorker() {
    }
}

func (gc *GarbageCollector) attemptToDeleteWorker() bool {
    item, quit := gc.attemptToDelete.Get()
    gc.workerLock.RLock()
    defer gc.workerLock.RUnlock()
    if quit {
        return false
    }
    defer gc.attemptToDelete.Done(item)
    n, ok := item.(*node)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
        return true
    }
    err := gc.attemptToDeleteItem(n)
    if err != nil {
        if _, ok := err.(*restMappingError); ok {
            // There are at least two ways this can happen:
            // 1. The reference is to an object of a custom type that has not yet been
            //    recognized by gc.restMapper (this is a transient error).
            // 2. The reference is to an invalid group/version. We don't currently
            //    have a way to distinguish this from a valid type we will recognize
            //    after the next discovery sync.
            // For now, record the error and retry.
            klog.V(5).Infof("error syncing item %s: %v", n, err)
        } else {
            utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
        }
        // retry if garbage collection of an object failed.
        gc.attemptToDelete.AddRateLimited(item)
    } else if !n.isObserved() {
        // requeue if item hasn't been observed via an informer event yet.
        // otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
        // see https://issue.k8s.io/56121
        klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
        gc.attemptToDelete.AddRateLimited(item)
    }
    return true
}

func (gc *GarbageCollector) runAttemptToOrphanWorker() {
    for gc.attemptToOrphanWorker() {
    }
}


func (gc *GarbageCollector) attemptToOrphanWorker() bool {
    item, quit := gc.attemptToOrphan.Get()
    gc.workerLock.RLock()
    defer gc.workerLock.RUnlock()
    if quit {
        return false
    }
    defer gc.attemptToOrphan.Done(item)
    owner, ok := item.(*node)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
        return true
    }
    // we don't need to lock each element, because they never get updated
    owner.dependentsLock.RLock()
    dependents := make([]*node, 0, len(owner.dependents))
    for dependent := range owner.dependents {
        dependents = append(dependents, dependent)
    }
    owner.dependentsLock.RUnlock()

    err := gc.orphanDependents(owner.identity, dependents)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
        gc.attemptToOrphan.AddRateLimited(item)
        return true
    }
    // update the owner, remove "orphaningFinalizer" from its finalizers list
    err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
        gc.attemptToOrphan.AddRateLimited(item)
    }
    return true
}

func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
    oldResources := make(map[schema.GroupVersionResource]struct{})
    wait.Until(func() {
        // Get the current resource list from discovery.
        newResources := GetDeletableResources(discoveryClient)

        // This can occur if there is an internal error in GetDeletableResources.
        if len(newResources) == 0 {
            klog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
            return
        }

        // Decide whether discovery has reported a change.
        if reflect.DeepEqual(oldResources, newResources) {
            klog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
            return
        }

        // Ensure workers are paused to avoid processing events before informers
        // have resynced.
        gc.workerLock.Lock()
        defer gc.workerLock.Unlock()

        // Once we get here, we should not unpause workers until we've successfully synced
        attempt := 0
        wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
            attempt++

            // On a reattempt, check if available resources have changed
            if attempt > 1 {
                newResources = GetDeletableResources(discoveryClient)
                if len(newResources) == 0 {
                    klog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
                    return false, nil
                }
            }

            klog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))

            // Resetting the REST mapper will also invalidate the underlying discovery
            // client. This is a leaky abstraction and assumes behavior about the REST
            // mapper, but we'll deal with it for now.
            gc.restMapper.Reset()
            klog.V(4).Infof("reset restmapper")

            // Perform the monitor resync and wait for controllers to report cache sync.
            //
            // NOTE: It's possible that newResources will diverge from the resources
            // discovered by restMapper during the call to Reset, since they are
            // distinct discovery clients invalidated at different times. For example,
            // newResources may contain resources not returned in the restMapper's
            // discovery call if the resources appeared in-between the calls. In that
            // case, the restMapper will fail to map some of newResources until the next
            // attempt.
            if err := gc.resyncMonitors(newResources); err != nil {
                utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
                return false, nil
            }
            klog.V(4).Infof("resynced monitors")

            // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
            // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
            // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
            // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
            // note that workers stay paused until we successfully resync.
            if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
                utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
                return false, nil
            }

            // success, break out of the loop
            return true, nil
        }, stopCh)

        // Finally, keep track of our new state. Do this after all preceding steps
        // have succeeded to ensure we'll retry on subsequent syncs if an error
        // occurred.
        oldResources = newResources
        klog.V(2).Infof("synced garbage collector")
    }, period, stopCh)
}

func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
    if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
        return err
    }
    gc.dependencyGraphBuilder.startMonitors()
    return nil
}

pkg/controller/garbagecollector/graph_builder.go 中

func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
    klog.Infof("GraphBuilder running")
    defer klog.Infof("GraphBuilder stopping")

    // Set up the stop channel.
    gb.monitorLock.Lock()
    gb.stopCh = stopCh
    gb.running = true
    gb.monitorLock.Unlock()

    // Start monitors and begin change processing until the stop channel is
    // closed.
    gb.startMonitors()
    wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)

    // Stop any running monitors.
    gb.monitorLock.Lock()
    defer gb.monitorLock.Unlock()
    monitors := gb.monitors
    stopped := 0
    for _, monitor := range monitors {
        if monitor.stopCh != nil {
            stopped++
            close(monitor.stopCh)
        }
    }

    // reset monitors so that the graph builder can be safely re-run/synced.
    gb.monitors = nil
    klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
}

func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
    gb.monitorLock.Lock()
    defer gb.monitorLock.Unlock()

    toRemove := gb.monitors
    if toRemove == nil {
        toRemove = monitors{}
    }
    current := monitors{}
    errs := []error{}
    kept := 0
    added := 0
    for resource := range resources {
        if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
            continue
        }
        if m, ok := toRemove[resource]; ok {
            current[resource] = m
            delete(toRemove, resource)
            kept++
            continue
        }
        kind, err := gb.restMapper.KindFor(resource)
        if err != nil {
            errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
            continue
        }
        c, s, err := gb.controllerFor(resource, kind)
        if err != nil {
            errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
            continue
        }
        current[resource] = &monitor{store: s, controller: c}
        added++
    }
    gb.monitors = current

    for _, monitor := range toRemove {
        if monitor.stopCh != nil {
            close(monitor.stopCh)
        }
    }

    klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
    // NewAggregate returns nil if errs is 0-length
    return utilerrors.NewAggregate(errs)
}

func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
    handlers := cache.ResourceEventHandlerFuncs{
        // add the event to the dependencyGraphBuilder's graphChanges.
        AddFunc: func(obj interface{}) {
            event := &event{
                eventType: addEvent,
                obj:       obj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            // TODO: check if there are differences in the ownerRefs,
            // finalizers, and DeletionTimestamp; if not, ignore the update.
            event := &event{
                eventType: updateEvent,
                obj:       newObj,
                oldObj:    oldObj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
        DeleteFunc: func(obj interface{}) {
            // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
            if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
                obj = deletedFinalStateUnknown.Obj
            }
            event := &event{
                eventType: deleteEvent,
                obj:       obj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
    }
    shared, err := gb.sharedInformers.ForResource(resource)
    if err != nil {
        klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
        return nil, nil, err
    }
    klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
    // need to clone because it's from a shared cache
    shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
    return shared.Informer().GetController(), shared.Informer().GetStore(), nil
}
上一篇 下一篇

猜你喜欢

热点阅读