深入分析kube-batch(1)——启动过程

2018-10-13  本文已影响0人  陈先生_9e91

深入分析kube-batch(1)——启动

从源码角度深入分析kube-batch

code

init,注册初始化

kube-batch\pkg\scheduler\factory.go

func init() {
    // Plugins for Jobs
    framework.RegisterPluginBuilder("drf", drf.New)
    framework.RegisterPluginBuilder("gang", gang.New)
    framework.RegisterPluginBuilder("predicates", predicates.New)
    framework.RegisterPluginBuilder("priority", priority.New)

    // Plugins for Queues
    framework.RegisterPluginBuilder("proportion", proportion.New)

    // Actions
    framework.RegisterAction(reclaim.New())
    framework.RegisterAction(allocate.New())
    framework.RegisterAction(preempt.New())
}

这里注册了五个Plugins以及三个Actions,可以想象整个调度过程都依赖这些组件去完成。而到底用哪些组件,就依赖配置文件了。

kube-batch\pkg\scheduler\util.go

var defaultSchedulerConf = map[string]string{
    "actions":                   "reclaim, allocate, preempt",
    "plugins":                   "gang, priority, drf, predicates, proportion",
    "plugin.gang.jobready":      "true",
    "plugin.gang.joborder":      "true",
    "plugin.gang.preemptable":   "true",
    "plugin.priority.joborder":  "true",
    "plugin.priority.taskorder": "true",
    "plugin.drf.preemptable":    "true",
    "plugin.drf.joborder":       "true",
}

默认这些组件都用上了。

kube-batch\pkg\scheduler\scheduler.go

func (pc *Scheduler) Run(stopCh <-chan struct{}) {
    // Start cache for policy.
    go pc.cache.Run(stopCh)

    // Load configuration of scheduler
    conf := defaultSchedulerConf
    if len(pc.schedulerConf) != 0 {
        conf, err = pc.cache.LoadSchedulerConf(pc.schedulerConf)
    }

    pc.actions, pc.pluginArgs = loadSchedulerConf(conf)

    go wait.Until(pc.runOnce, 1*time.Second, stopCh)
}

func (pc *Scheduler) runOnce() {
    glog.V(4).Infof("Start scheduling ...")
    defer glog.V(4).Infof("End scheduling ...")

    ssn := framework.OpenSession(pc.cache, pc.pluginArgs)
    defer framework.CloseSession(ssn)

    for _, action := range pc.actions {
        action.Execute(ssn)
    }
}

整个调度过程如上:

  1. 启动cache,watch一些RESTs
  2. 加载配置文件
  3. 间隔一秒,执行调度,开启会话
  4. 执行action

下面分别介绍以上过程

cache

kube-batch\pkg\scheduler\cache\interface.go

// Cache collects pods/nodes/queues information
// and provides information snapshot
type Cache interface {
    // Run start informer
    Run(stopCh <-chan struct{})

    // Snapshot deep copy overall cache information into snapshot
    Snapshot() *api.ClusterInfo

    // SchedulerConf return the property of scheduler configuration
    LoadSchedulerConf(path string) (map[string]string, error)

    // WaitForCacheSync waits for all cache synced
    WaitForCacheSync(stopCh <-chan struct{}) bool

    // Bind binds Task to the target host.
    // TODO(jinzhej): clean up expire Tasks.
    Bind(task *api.TaskInfo, hostname string) error

    // Evict evicts the task to release resources.
    Evict(task *api.TaskInfo, reason string) error

    // Backoff puts job in backlog for a while.
    Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error
}

cache模块负责两件事情:

具体实现

kube-batch\pkg\scheduler\cache\cache.go

func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
    go sc.pdbInformer.Informer().Run(stopCh)
    go sc.podInformer.Informer().Run(stopCh)
    go sc.nodeInformer.Informer().Run(stopCh)
    go sc.podGroupInformer.Informer().Run(stopCh)

    if sc.namespaceAsQueue {
        go sc.nsInformer.Informer().Run(stopCh)
    } else {
        go sc.queueInformer.Informer().Run(stopCh)
    }

    // Re-sync error tasks.
    go sc.resync()

    // Cleanup jobs.
    go sc.cleanupJobs()
}

这里看到kube-batch会cache PDB/Pod/Node/PodGroup,其中PodGroup是kube-batch定义的CRDs,是实现批量调度的核心。

kube-batch\config\crds\scheduling_v1alpha1_podgroup.yaml

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: podgroups.scheduling.incubator.k8s.io
spec:
  group: scheduling.incubator.k8s.io
  names:
    kind: PodGroup
    plural: podgroups
  scope: Namespaced
  validation:
    openAPIV3Schema:
      properties:
        apiVersion:
          type: string
        kind:
          type: string
        metadata:
          type: object
        spec:
          properties:
            minMember:
              format: int32
              type: integer
          type: object
        status:
          properties:
            succeeded:
              format: int32
              type: integer
            failed:
              format: int32
              type: integer
            running:
              format: int32
              type: integer
          type: object
      type: object
  version: v1alpha1

显然,我们需要重点关注sepc.minMember,它表示一个PodGroup作业需要的task数量,即Pod数量。具体的实现逻辑,后面会分析。cache

session

kube-batch\pkg\scheduler\framework\framework.go

func OpenSession(cache cache.Cache, args []*PluginArgs) *Session {
    ssn := openSession(cache)

    for _, plugin := range ssn.plugins {
        plugin.OnSessionOpen(ssn)
    }

    return ssn
}

func openSession(cache cache.Cache) *Session {
    ssn := &Session{
        UID:        uuid.NewUUID(),
        cache:      cache,
        JobIndex:   map[api.JobID]*api.JobInfo{},
        NodeIndex:  map[string]*api.NodeInfo{},
        QueueIndex: map[api.QueueID]*api.QueueInfo{},
    }        
    snapshot := cache.Snapshot()
}

这里看到每次执行调度会话都会dump cache snapshot,然后执行之前注册的plugin。具体的plugin过程会在后面分析。plugins

action

kube-batch\pkg\scheduler\scheduler.go

func (pc *Scheduler) runOnce() {
    glog.V(4).Infof("Start scheduling ...")
    defer glog.V(4).Infof("End scheduling ...")

    ssn := framework.OpenSession(pc.cache, pc.pluginArgs)
    defer framework.CloseSession(ssn)

    if glog.V(3) {
        glog.V(3).Infof("%v", ssn)
    }

    for _, action := range pc.actions {
        action.Execute(ssn)
    }

}

最后就是一次执行之前注册的action

上一篇下一篇

猜你喜欢

热点阅读