[k8s源码分析][kube-scheduler]schedul
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析
cmd/kube-scheduler
和pkg/scheduler/scheduler.go
和pkg/scheduler/factory/factory.go
等目录或文件. 其中比较重要的两个类configFactory
(factory.go
)和Scheduler
(scheduler.go
).
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 流程图
run_1.png
3. 代码流程
接下来就从代码的角度看看
kube-scheduler
是如何启动的. 为了节约篇幅, 有些无关或者不影响理解的代码我将不放到代码中.
3.1 cmd/kube-scheduler
// cmd/kube-scheduler/scheduler.go
func main() {
...
command := app.NewSchedulerCommand()
...
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
通过
NewSchedulerCommand()
方法进入到了cmd/kube-scheduler/app/server.go
.
// cmd/kube-scheduler/app/server.go
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
...
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
...
return cmd
}
关于
cobra
可以自己去官网看, 就是一个命令行的工具, 这里不多加介绍了.
主要需要关注一下opts, err := options.NewOptions()
, 因为这里会生成一些默认的属性, 比较重要的两个地方就是:
DefaultProvider
, 就是默认调度器的名字.
LeaderElection
的属性会设置为true
, 就是kube-scheduler
要启动高可用, 这里会有一篇单独的博客来进行介绍.
另外如果
kube-scheduler
命令设置了--config
文件来设置自定义调度器, 会从cmd/kube-scheduler/app/options/options.go
中的Flags
进行解析.
// cmd/kube-scheduler/app/options/options.go
// Flags returns flags for a specific scheduler by section name
func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
fs := nfs.FlagSet("misc")
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
return nfs
}
现在回到上面的
NewSchedulerCommand
方法中, 已经完成了opts
, 所以就调用了runCommand
方法.
// cmd/kube-scheduler/app/server.go
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
...
// 对opts的属性进行验证
if errs := opts.Validate(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
os.Exit(1)
}
// 如果需要 就把opts的ComponentConfig文件保存起来
if len(opts.WriteConfigTo) > 0 {
if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
}
// 根据opts生成一个scheduler config 对象
c, err := opts.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
stopCh := make(chan struct{})
// Get the completed config
// 根据scheduler config 生成一个completed config
cc := c.Complete()
// 看看打开哪些feature
algorithmprovider.ApplyFeatureGates()
// 向componentconfig中注册配置文件
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// 上面的一系列操作 就是为了获得一个completed config
return Run(cc, stopCh)
}
这里的一系列操作 就是为了获得一个
completed config
, 然后给Run
调用. 这里需要关注一个地方就是opts.Config()
.
// cmd/kube-scheduler/app/options/options.go
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
// 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// only apply deprecated flags if no config file is loaded (this is the old behaviour).
if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
return err
}
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
// 如果kube-scheduler 指定了--config 那就会从配置文件中取
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// use the loaded config file only, with the exception of --address and --port. This means that
// none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
// behaviour of the flags we have to keep.
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
...
return nil
}
func (o *Options) Config() (*schedulerappconfig.Config, error) {
if o.SecureServing != nil {
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
}
c := &schedulerappconfig.Config{}
if err := o.ApplyTo(c); err != nil {
return nil, err
}
// Prepare kube clients.
// 生成client 可以调用api-server
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
...
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
// 默认值就是true 只要用户不设置为false 这一步就会执行
// 也就是说kube-scheduler 默认就是支持高可用
if c.ComponentConfig.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
if err != nil {
return nil, err
}
}
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = factory.NewPodInformer(client, 0)
c.EventClient = eventClient
c.Recorder = recorder
c.Broadcaster = eventBroadcaster
c.LeaderElection = leaderElectionConfig
return c, nil
}
ApplyTo: 主要是操作是否有配置文件, 如果有配置文件就会从配置文件中读取.
Config: 主要为了生成与api-server
通信的client
以及leaderElectionConfig
用于支持kube-scheduler
高可用.
接下来回到
cmd/kube-scheduler/app/server.go
中的runCommand
, 然后进行Run(cc, stopCh)
方法. 因为该Run
是真正的核心方法, 所以这里我们主要分块分析, 先看看是如何生成pkg/scheduler/scheduler.go
中的Scheduler
对象.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
...
// Create the scheduler.
// 生成pkg/scheduler/scheduler.go 的Scheduler对象
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
...
}
可以看到之前的一系列操作都是为了生成
Scheduler
所需要的配置. 包括了cc.Client
,cc.ComponentConfig.AlgorithmSource
等等.
3.2 pkg/scheduler/scheduler.go
可以看到整个
Scheduler
结构体就一个属性, 就是pkg/scheduler/factory/factory.go
中的Config
结构体.
type Scheduler struct {
config *factory.Config
}
来看看
New
方法
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
/**
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
*/
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
// 生成factory的config-factory
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: options.schedulerName,
Client: client,
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: options.enableEquivalenceClassCache,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
})
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 默认调度器会进入到这里 *source.Provider = DefaultProvider
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
// 自定义调度器会进入到这里
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil
}
1. 根据传起来的
opts
方法生成options
, 因为默认的属性是下面的几个, 如果需要改变, 就是通过opts
方法中来改变.
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: false,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
}
2. 根据参数生成
factory
的configFactory
对象名字为configurator
, 这个后面部分会具体研究.
3. 根据source
的不同来选择如何生成Scheduler
对象的Config
, 这里就讨论默认调度器的, 自定义调度器会有一篇专门博客介绍. 所以就是会进入sc, err := configurator.CreateFromProvider(*source.Provider)
中生成所需的Config
.
// pkg/scheduler/factory/factory.go
// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}
return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}
这里在[k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler已经介绍了
GetAlgorithmProvider(providerName)
其中providerName="DefaultProvider"
获得了默认调度器的所有预选和优选方法的key
.
然后根据
configFactory
的CreateFromKeys
根据所有的预选方法和优选方法以及扩展方法(这里是空的)生成了Scheduler
所需要的Config
. 这里CreateFromKeys
放到后面的configFactory
一起说明.
4. 根据Config
生成Scheduler
对象sched
.
// pkg/scheduler/scheduler.go
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
config: config,
}
}
3.3 pkg/scheduler/factory/factory.go
这里将分析
3.2 pkg/scheduler/scheduler.go
中提到的configurator := factory.NewConfigFactory
部分.
3.3.1 configFactory
下面是关于
configFactory
结构体的定义
type configFactory struct {
// 与api-server通信的客户端
client clientset.Interface
// queue for pods that need scheduling
// 存着那些需要调度的pod
podQueue internalqueue.SchedulingQueue
// a means to list all known scheduled pods.
// 可以获得所有已经调度的pod
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
// 可以获得所有已经调度的pod和那些assumed pod
podLister algorithm.PodLister
// a means to list all nodes
nodeLister corelisters.NodeLister
// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
pVCLister corelisters.PersistentVolumeClaimLister
// a means to list all services
serviceLister corelisters.ServiceLister
// a means to list all controllers
controllerLister corelisters.ReplicationControllerLister
// a means to list all replicasets
replicaSetLister appslisters.ReplicaSetLister
// a means to list all statefulsets
statefulSetLister appslisters.StatefulSetLister
// a means to list all PodDisruptionBudgets
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// Close this to stop all reflectors
StopEverything <-chan struct{}
scheduledPodsHasSynced cache.InformerSynced
schedulerCache schedulerinternalcache.Cache
// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's "spec.schedulerName".
// 调度器的名字 默认为default-scheduler
schedulerName string
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
hardPodAffinitySymmetricWeight int32
// Equivalence class cache
// 加速predicate阶段的equivalence class cache
equivalencePodCache *equivalence.Cache
// Enable equivalence class cache
enableEquivalenceClassCache bool
// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder
// Always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
// Disable pod preemption or not.
// 是否禁止抢占
disablePreemption bool
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32
}
NewFactory
方法
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
c := &configFactory{
client: args.Client,
podLister: schedulerCache,
podQueue: internalqueue.NewSchedulingQueue(stopEverything),
nodeLister: args.NodeInformer.Lister(),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
...
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
...
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)
go func() {
for {
select {
case <-c.StopEverything:
c.podQueue.Close()
return
case <-ch:
debugger.Comparer.Compare()
debugger.Dumper.DumpAll()
}
}
}()
return c
}
这里主要需要注意几点就是:
1.schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
实例化了一个schedulerCache
, 它的具体实现和结构在[k8s源码分析][kube-scheduler]scheduler/internal/cache之node_tree和cache已经分析过了, 这里主要看一下它在哪里会用到.
2. 可以看到configFactory
的podLister
和schedulerCache
用的是同一个schedulerCache
对象. 因为podLister
的定义就是可以获得所有已经调度的pod和那些assumed pod, 所以用schedulerCache
很好理解.
3.configFactory
的scheduledPodLister
定义是可以获得所有已经调度的pod,args.PodInformer.Lister()
可以得到所有的pod
, 关于informer
在client-go
系列会有专门博客分析, 这里不细说, 很明显assignedPodLister
就是在args.PodInformer.Lister()
外面加了一层过滤那些已经被调度的pods
.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
type assignedPodLister struct {
corelisters.PodLister
}
// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
list, err := l.PodLister.List(selector)
if err != nil {
return nil, err
}
filtered := make([]*v1.Pod, 0, len(list))
for _, pod := range list {
// 选择那些已经被调度过的
if len(pod.Spec.NodeName) > 0 {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
}
4.
podQueue
的定义是存着那些需要调度的pod, 因此用的internalqueue.NewSchedulingQueue(stopEverything)
, 关于scheduling_queue
在[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(1) 和 [k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(2) 中有详细分析过.
5. 就是关于各种informers
添加各种处理逻辑EventHandler
, 包括podInformer
,serviceInformer
,NodeInformer
,PvInformer
,PvcInformer
,StorageClassInformer
等等, 该部分会在下一个主题分析.
3.2 Config
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
// 一个schedulerCache 就是configFactory的schedulerCache
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
// 就是configFactory的equivalencePodCache
Ecache *equivalence.Cache
// 获得所有Node的Lister
NodeLister algorithm.NodeLister
// 用于调度的算法
Algorithm algorithm.ScheduleAlgorithm
// Bind方法
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
// 抢占器
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
// 取下一个需要调度的pod
// 如果没有了, 则block一直等到有
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
VolumeBinder *volumebinder.VolumeBinder
DisablePreemption bool
// cache需要被调度的pod
SchedulingQueue internalqueue.SchedulingQueue
}
这里需要注意的是:
NextPod: 是一个方法, 所有的需要调度的pod
都会存到这里, 然后一个一个出来进行调度.
接下来看看上面提到
configFactory
的CreateFromKeys
, 该方法根据当前的configFactory
根据提供的预选方法, 优选方法和扩展方法从而生成一个factory.go
中的Config
对象.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}
// 根据当前的预选key得到所有的预选方法
predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}
// 根据当前的优选key得到所有的优选方法
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
// priorityMetaProducer 在算分的时候会用到
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}
// predicateMetaProducer 在真正预选的时候会用到
predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}
// 是否打开了加速predicate的equivalence class cache
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
klog.Info("Created equivalence class cache")
}
// 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)
podBackoff := util.CreateDefaultPodBackoff()
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo,
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: func() *v1.Pod {
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}
这里需要注意的是:
1. 根据预选, 优选key
得到其对应的预选和优选方法. 并得到注册的priorityMetaProducer
和predicateMetaProducer
.
2. 生成真正进行调度计算的algorithm.ScheduleAlgorithm
接口类, 返回一个它的实现类genericScheduler(pkg/scheduler/core/generic_scheduler.go
)对象.
// pkg/scheduler/algorithm/scheduler_interface.go
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
}
// pkg/scheduler/core/generic_scheduler.go
func NewGenericScheduler(
cache schedulerinternalcache.Cache,
eCache *equivalence.Cache,
podQueue internalqueue.SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.PriorityMetadataProducer,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister algorithm.PDBLister,
alwaysCheckAllPredicates bool,
disablePreemption bool,
percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}
3. 生成
GetBinder
,getBinderFunc
返回一个对该pod
支持的extender
绑定器或者默认绑定器.
// pkg/scheduler/factory/factory.go
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
extenderBinder = extenders[i]
break
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
return defaultBinder
}
}
4. 生成
PodConditionUpdater
和PodPreemptor
, 都是与api-server
通信的客户端(client
).
5. NextPod这里最核心的一个函数, 因为所有需要调度的pod
都是从这里出来的.
NextPod: func() *v1.Pod {
return c.getNextPod()
}
func (c *configFactory) getNextPod() *v1.Pod {
pod, err := c.podQueue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
return pod
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
可以看到所有的
pod
都是从podQueue
中出来的, 所以对于pod
是在哪里进入podQueue
就比较重要了, 这里就会涉及了上面说的各种informers
, 所以放到下一篇博客说明.
3.4 返回到Run
3.3 中分析了3.1中
cmd/kube-scheduler/app/server.go
中Run
方法中是如何生成pkg/scheduler/scheduler.go
中的Scheduler
对象的. 那么现在该对象创建完了会怎么样呢?所以需要回到cmd/kube-scheduler/app/server.go
中Run
方法中.
// cmd/kube-scheduler/app/server.go
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
...
// Create the scheduler.
// 生成pkg/scheduler/scheduler.go 的Scheduler对象
sched, err := scheduler.New
...
// Start all informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, runCommand via LeaderElector until done and exit.
// 启动高可用
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// 调用run方法
OnStartedLeading: run,
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}
1. 启动了所有
informers
.
2. 因为默认是支持高可用的, 所以会以高可用的方式启动sched.Run()
方法.
接下来看看
sched.Run
方法.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
...
}
可以看到每隔
0
秒执行scheduleOne
方法, 而schedulerOne
方法中就是调用sched.config.NextPod()
从它的podQueue
中pop
出一个pod
进行调度.
说白了就是不断从
podQueue
中出一个pod
进行调度, 如果podQueue
中没有, 就block
在这里.
4. 总结
分析完整个过程, 可以看到
1. 解析文件或者根据默认配置生成一个completed config
.
2. 启动跟pod
有关的informers
监控集群中的变化并按照相关规则进入到一个scheduling_queue
, 也就是podQueue
.
3. 启动无限制循环一直读podQueue
来进行调度.