k8s 之 cronjob controller 源码简单分析

2020-07-20  本文已影响0人  wwq2020

简介

cronjob controller 监控 cronjob 的变化,然后创建相应的 pod
kubelet 监听 pod 变化,进行实际的a pod 操作
然后cronjob c

创建处

pkg/registry/batch/cronjob/storage/storage.go 中

func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &batch.CronJob{} },
        NewListFunc:              func() runtime.Object { return &batch.CronJobList{} },
        DefaultQualifiedResource: batch.Resource("cronjobs"),

        CreateStrategy: cronjob.Strategy,
        UpdateStrategy: cronjob.Strategy,
        DeleteStrategy: cronjob.Strategy,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter}
    if err := store.CompleteWithOptions(options); err != nil {
        return nil, nil, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = cronjob.StatusStrategy

    return &REST{store}, &StatusREST{store: &statusStore}, nil
}

controller

cmd/kube-controller-manager/app/batch.go 中

func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
        return nil, false, nil
    }
    cjc, err := cronjob.NewController(
        ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
    }
    go cjc.Run(ctx.Stop)
    return nil, true, nil
}

pkg/controller/cronjob/cronjob_controller.go 中

func NewController(kubeClient clientset.Interface) (*Controller, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }

    jm := &Controller{
        kubeClient: kubeClient,
        jobControl: realJobControl{KubeClient: kubeClient},
        cjControl:  &realCJControl{KubeClient: kubeClient},
        podControl: &realPodControl{KubeClient: kubeClient},
        recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
    }

    return jm, nil
}

// Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    klog.Infof("Starting CronJob Manager")
    // Check things every 10 second.
    go wait.Until(jm.syncAll, 10*time.Second, stopCh)
    <-stopCh
    klog.Infof("Shutting down CronJob Manager")
}

// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *Controller) syncAll() {
    // List children (Jobs) before parents (CronJob).
    // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
    // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
    // Note that this only works because we are NOT using any caches here.
    jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
        return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
    }

    js := make([]batchv1.Job, 0)
    err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
        jobTmp, ok := object.(*batchv1.Job)
        if !ok {
            return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
        }
        js = append(js, *jobTmp)
        return nil
    })

    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
        return
    }

    klog.V(4).Infof("Found %d jobs", len(js))
    cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
        return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
    }

    jobsByCj := groupJobsByParent(js)
    klog.V(4).Infof("Found %d groups", len(jobsByCj))
    err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
        cj, ok := object.(*batchv1beta1.CronJob)
        if !ok {
            return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)
        }
        syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
        cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
        return nil
    })

    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
        return
    }
}

上一篇下一篇

猜你喜欢

热点阅读