volcano Job初探

2019-07-22  本文已影响0人  陈先生_9e91

valcano

参考:

job-api

Error Handling

// Event is the type of Event related to the Job
type Event string

const (
    // AllEvents means all event
    AllEvents             Event = "*"
    // PodFailedEvent is triggered if Pod was failed
    PodFailedEvent        Event = "PodFailed"
    // PodEvictedEvent is triggered if Pod was deleted
    PodEvictedEvent       Event = "PodEvicted"
    // These below are several events can lead to job 'Unknown'
    // 1. Task Unschedulable, this is triggered when part of
    //    pods can't be scheduled while some are already running in gang-scheduling case.
    JobUnknownEvent Event = "Unknown"

    // OutOfSyncEvent is triggered if Pod/Job were updated
    OutOfSyncEvent Event = "OutOfSync"
    // CommandIssuedEvent is triggered if a command is raised by user
    CommandIssuedEvent Event = "CommandIssued"
    // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
    TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the type of event handling 
type Action string

const (
    // AbortJobAction if this action is set, the whole job will be aborted:
    // all Pod of Job will be evicted, and no Pod will be recreated
    AbortJobAction Action = "AbortJob"
    // RestartJobAction if this action is set, the whole job will be restarted
    RestartJobAction Action = "RestartJob"
    // TerminateJobAction if this action is set, the whole job wil be terminated
    // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
    TerminateJobAction Action = "TerminateJob"
    // CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
    CompleteJobAction Action = "CompleteJob"

    // ResumeJobAction is the action to resume an aborted job.
    ResumeJobAction Action = "ResumeJob"
    // SyncJobAction is the action to sync Job/Pod status.
    SyncJobAction Action = "SyncJob"
)

// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
    Event  Event  `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
    Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
    Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}

通过LifecyclePolicy对不同Job event做针对性处理。例如分布式训练,一个task失败,可以让整个Job重新运行。

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: tf-job
spec:
  # If any event here, restart the whole job.
  policies:
  - event: *
    action: RestartJob
  tasks:
  - name: "ps"
    replicas: 1
    template:
      spec:
        containers:
        - name: ps
          image: ps-img
  - name: "worker"
    replicas: 5
    template:  
      spec: 
        containers:
        - name: worker
          image: worker-img
  ...

还可以针对每个task指定action

Gang-schduling

这部分完全就是kube-batch的设计。spec.minAvailable表示结对调度pod的数量。spec.minAvailable默认值是spec.tasks.replicas的总和。如果spec.minAvailable > sumspec.tasks.replicas,就无法创建Job。如果spec.minAvailable < sumspec.tasks.replicas,就根据task优先级创建或者随机创建。

task优先级

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: spark-job
spec:
  minAvailable: 3
  tasks:
  - name: "driver"
    replicas: 1
    template:
      spec:
        priorityClass: "master-pri" // 高优先级
        containers:
        - name: driver
          image: driver-img
  - name: "executor"
    replicas: 5
    template:
      spec: 
        containers:
        - name: executor
          image: executor-img

调度器只能保证优先调度Job中高优的pods

Job plugins

  plugins:
    ssh: []
    env: []
    svc: []

目前提供3种插件,帮助用户运行AI job

func generateHost(job *vkv1.Job) map[string]string {
   data := make(map[string]string, len(job.Spec.Tasks))

   for _, ts := range job.Spec.Tasks {
      hosts := make([]string, 0, ts.Replicas)

      for i := 0; i < int(ts.Replicas); i++ {
         hostName := ts.Template.Spec.Hostname
         subdomain := ts.Template.Spec.Subdomain
         if len(hostName) == 0 {
            hostName = vkhelpers.MakePodName(job.Name, ts.Name, i)
         }
         if len(subdomain) == 0 {
            subdomain = job.Name
         }
         hosts = append(hosts, hostName+"."+subdomain)
         if len(ts.Template.Spec.Hostname) != 0 {
            break
         }
      }

      key := fmt.Sprintf(ConfigMapTaskHostFmt, ts.Name)
      data[key] = strings.Join(hosts, "\n")
   }

   return data
}
上一篇 下一篇

猜你喜欢

热点阅读