k8s 之 cronjob controller 源码简单分析
2020-07-20 本文已影响0人
wwq2020
简介
cronjob controller 监控 cronjob 的变化,然后创建相应的 pod
kubelet 监听 pod 变化,进行实际的a pod 操作
然后cronjob c
创建处
pkg/registry/batch/cronjob/storage/storage.go 中
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &batch.CronJob{} },
NewListFunc: func() runtime.Object { return &batch.CronJobList{} },
DefaultQualifiedResource: batch.Resource("cronjobs"),
CreateStrategy: cronjob.Strategy,
UpdateStrategy: cronjob.Strategy,
DeleteStrategy: cronjob.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, err
}
statusStore := *store
statusStore.UpdateStrategy = cronjob.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}, nil
}
controller
cmd/kube-controller-manager/app/batch.go 中
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
return nil, false, nil
}
cjc, err := cronjob.NewController(
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
}
go cjc.Run(ctx.Stop)
return nil, true, nil
}
pkg/controller/cronjob/cronjob_controller.go 中
func NewController(kubeClient clientset.Interface) (*Controller, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
jm := &Controller{
kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient},
cjControl: &realCJControl{KubeClient: kubeClient},
podControl: &realPodControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
}
return jm, nil
}
// Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting CronJob Manager")
// Check things every 10 second.
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
<-stopCh
klog.Infof("Shutting down CronJob Manager")
}
// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *Controller) syncAll() {
// List children (Jobs) before parents (CronJob).
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
// Note that this only works because we are NOT using any caches here.
jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
}
js := make([]batchv1.Job, 0)
err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
jobTmp, ok := object.(*batchv1.Job)
if !ok {
return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
}
js = append(js, *jobTmp)
return nil
})
if err != nil {
utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
return
}
klog.V(4).Infof("Found %d jobs", len(js))
cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
}
jobsByCj := groupJobsByParent(js)
klog.V(4).Infof("Found %d groups", len(jobsByCj))
err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
cj, ok := object.(*batchv1beta1.CronJob)
if !ok {
return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)
}
syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
return nil
})
if err != nil {
utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
return
}
}