kube controller manager之ttl-afte

2023-06-07  本文已影响0人  wwq2020

作用

当job创建时,如果设置了ttlSecondsAfterFinished且job已完成(condition中Complete或者Failed为True)
当job更新时,如果设置了ttlSecondsAfterFinished且job已完成(condition中Complete或者Failed为True)

image.png

相关代码

前置相关代码

cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    ...
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    ...
    return controllers
}

cmd/kube-controller-manager/app/core.go

func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    go ttlafterfinished.New(
        controllerContext.InformerFactory.Batch().V1().Jobs(),
        controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
    ).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs))
    return nil, true, nil
}

pkg/controller/ttlafterfinished/ttlafterfinished_controller.go

func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
        ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
    }

    metrics.Register()

    tc := &Controller{
        client:   client,
        recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
        queue:    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
    }

    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    tc.addJob,
        UpdateFunc: tc.updateJob,
    })

    tc.jLister = jobInformer.Lister()
    tc.jListerSynced = jobInformer.Informer().HasSynced

    tc.clock = clock.RealClock{}

    return tc
}

// Run starts the workers to clean up Jobs.
func (tc *Controller) Run(ctx context.Context, workers int) {
    defer utilruntime.HandleCrash()
    defer tc.queue.ShutDown()

    klog.Infof("Starting TTL after finished controller")
    defer klog.Infof("Shutting down TTL after finished controller")

    if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, tc.worker, time.Second)
    }

    <-ctx.Done()
}

主要代码

pkg/controller/volume/pvprotection/pv_protection_controller.go

func (tc *Controller) addJob(obj interface{}) {
    job := obj.(*batch.Job)
    klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)

    if job.DeletionTimestamp == nil && needsCleanup(job) {
        tc.enqueue(job)
    }
}

func (tc *Controller) updateJob(old, cur interface{}) {
    job := cur.(*batch.Job)
    klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)

    if job.DeletionTimestamp == nil && needsCleanup(job) {
        tc.enqueue(job)
    }
}


func (tc *Controller) processJob(ctx context.Context, key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
    // Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
    job, err := tc.jLister.Jobs(namespace).Get(name)
    if errors.IsNotFound(err) {
        return nil
    }
    if err != nil {
        return err
    }

    if expiredAt, err := tc.processTTL(job); err != nil {
        return err
    } else if expiredAt == nil {
        return nil
    }

    // The Job's TTL is assumed to have expired, but the Job TTL might be stale.
    // Before deleting the Job, do a final sanity check.
    // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
    // The latest Job may have a different UID, but it's fine because the checks will be run again.
    fresh, err := tc.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
    if errors.IsNotFound(err) {
        return nil
    }
    if err != nil {
        return err
    }
    // Use the latest Job TTL to see if the TTL truly expires.
    expiredAt, err := tc.processTTL(fresh)
    if err != nil {
        return err
    } else if expiredAt == nil {
        return nil
    }
    // Cascade deletes the Jobs if TTL truly expires.
    policy := metav1.DeletePropagationForeground
    options := metav1.DeleteOptions{
        PropagationPolicy: &policy,
        Preconditions:     &metav1.Preconditions{UID: &fresh.UID},
    }
    klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
    if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
        return err
    }
    metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
    return nil
}
上一篇下一篇

猜你喜欢

热点阅读