深入分析K8S Jobs

2018-09-19  本文已影响0人  陈先生_9e91

深入分析K8S Jobs

Jobs常用于Batch计算任务,即容器会运行结束。如果容器正常退出,即exit-code == 0,那么Jobs就是Completed。

实现猜想

Job controller需要list-watch Job & Pod,发现新的Job创建,就创建对应的Pod;发现Pod失败,就继续创建Pod,直到失败次数达到backoffLimit, Specifies the number of retries before marking this job failed. Defaults to 6,将Job标记为failed。

code

K8S代码非常的Go语言,充斥大量的生成者消费者模型,各种goroutine & channel。不熟悉的话,看起来会比较吃力,可以先看郝大的《Go语言并发编程实战》。

k8s.io\kubernetes\pkg\controller\job\job_controller.go

// 入口
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {
    // Informer之前在list-watch有过相关介绍
    // 这里JobController对Job & Pod进行list-watch,符合之前猜想
    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
        UpdateFunc: jm.updateJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
    })
    
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    jm.addPod,
        UpdateFunc: jm.updatePod,
        DeleteFunc: jm.deletePod,
    })
}
// 将Added ev的add workqueue,典型生产者消费者
func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
    key, err := controller.KeyFunc(obj)
    jm.queue.AddAfter(key, backoff)
}

// queue消费者
func (jm *JobController) processNextWorkItem() bool {
    key, quit := jm.queue.Get() 
    
    forget, err := jm.syncHandler(key.(string))
}

// 同步Job,使Job状态达到预期
func (jm *JobController) syncJob(key string) (bool, error) {
    ns, name, err := cache.SplitMetaNamespaceKey(key)
    sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
    job := *sharedJob

    // if job was finished previously, we don't want to redo the termination
    if IsJobFinished(&job) {
        return true, nil
    }
    
    pods, err := jm.getPodsForJob(&job)
    
    activePods := controller.FilterActivePods(pods)
    active := int32(len(activePods))
    succeeded, failed := getStatus(pods)
    
    if jobFailed {
        jm.deleteJobPods(&job, activePods, errCh)
    } else {
        active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
    }
}

// 管理Job的Pod数量
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
    active := int32(len(activePods))
    parallelism := *job.Spec.Parallelism
    
    if active > parallelism {
        diff := active - parallelism
        
        // "Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff
        for i := int32(0); i < diff; i++ {
            go func(ix int32) {
                jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); 
            }(i)
        }
    } else if active < parallelism {
        diff := wantActive - active
        
        // "Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff
        // batch create Pod。
        // 类似TCP的拥塞控制, 每次创建1,2,4....
        for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
            for i := int32(0); i < batchSize; i++ {
                go func() {
                    jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
                }()
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读