深入分析K8S Jobs
2018-09-19 本文已影响0人
陈先生_9e91
深入分析K8S Jobs
Jobs常用于Batch计算任务,即容器会运行结束。如果容器正常退出,即exit-code == 0,那么Jobs就是Completed。
实现猜想
Job controller需要list-watch Job & Pod,发现新的Job创建,就创建对应的Pod;发现Pod失败,就继续创建Pod,直到失败次数达到backoffLimit, Specifies the number of retries before marking this job failed. Defaults to 6
,将Job标记为failed。
code
K8S代码非常的Go语言,充斥大量的生成者消费者模型,各种goroutine & channel。不熟悉的话,看起来会比较吃力,可以先看郝大的《Go语言并发编程实战》。
k8s.io\kubernetes\pkg\controller\job\job_controller.go
// 入口
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {
// Informer之前在list-watch有过相关介绍
// 这里JobController对Job & Pod进行list-watch,符合之前猜想
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
UpdateFunc: jm.updateJob,
DeleteFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
})
}
// 将Added ev的add workqueue,典型生产者消费者
func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
key, err := controller.KeyFunc(obj)
jm.queue.AddAfter(key, backoff)
}
// queue消费者
func (jm *JobController) processNextWorkItem() bool {
key, quit := jm.queue.Get()
forget, err := jm.syncHandler(key.(string))
}
// 同步Job,使Job状态达到预期
func (jm *JobController) syncJob(key string) (bool, error) {
ns, name, err := cache.SplitMetaNamespaceKey(key)
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
job := *sharedJob
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
return true, nil
}
pods, err := jm.getPodsForJob(&job)
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(pods)
if jobFailed {
jm.deleteJobPods(&job, activePods, errCh)
} else {
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
}
}
// 管理Job的Pod数量
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
if active > parallelism {
diff := active - parallelism
// "Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff
for i := int32(0); i < diff; i++ {
go func(ix int32) {
jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job);
}(i)
}
} else if active < parallelism {
diff := wantActive - active
// "Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff
// batch create Pod。
// 类似TCP的拥塞控制, 每次创建1,2,4....
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
for i := int32(0); i < batchSize; i++ {
go func() {
jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
}()
}
}
}
}