k8s那点事儿Docker容器k8s入门

[k8s源码分析][kube-scheduler]schedul

2019-10-13  本文已影响0人  nicktming

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

本文将分析cmd/kube-schedulerpkg/scheduler/scheduler.gopkg/scheduler/factory/factory.go等目录或文件. 其中比较重要的两个类configFactory(factory.go)和Scheduler(scheduler.go).
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)

2. 流程图

run_1.png

3. 代码流程

接下来就从代码的角度看看kube-scheduler是如何启动的. 为了节约篇幅, 有些无关或者不影响理解的代码我将不放到代码中.

3.1 cmd/kube-scheduler

// cmd/kube-scheduler/scheduler.go

func main() {
    ...
    command := app.NewSchedulerCommand()
    ...
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

通过NewSchedulerCommand()方法进入到了cmd/kube-scheduler/app/server.go.

// cmd/kube-scheduler/app/server.go

// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        ...
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    ...
    return cmd
}

关于cobra可以自己去官网看, 就是一个命令行的工具, 这里不多加介绍了.
主要需要关注一下opts, err := options.NewOptions(), 因为这里会生成一些默认的属性, 比较重要的两个地方就是:
DefaultProvider, 就是默认调度器的名字.
LeaderElection的属性会设置为true, 就是kube-scheduler要启动高可用, 这里会有一篇单独的博客来进行介绍.

另外如果kube-scheduler命令设置了--config文件来设置自定义调度器, 会从cmd/kube-scheduler/app/options/options.go中的Flags进行解析.

// cmd/kube-scheduler/app/options/options.go

// Flags returns flags for a specific scheduler by section name
func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
    fs := nfs.FlagSet("misc")
    fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
    fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
    fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

    o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
    o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
    o.Authentication.AddFlags(nfs.FlagSet("authentication"))
    o.Authorization.AddFlags(nfs.FlagSet("authorization"))
    o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

    leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
    utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))

    return nfs
}

现在回到上面的NewSchedulerCommand方法中, 已经完成了opts, 所以就调用了runCommand方法.

// cmd/kube-scheduler/app/server.go

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    ...
    // 对opts的属性进行验证
    if errs := opts.Validate(); len(errs) > 0 {
        fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
        os.Exit(1)
    }
    // 如果需要 就把opts的ComponentConfig文件保存起来
    if len(opts.WriteConfigTo) > 0 {
        if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
        klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
    }
    // 根据opts生成一个scheduler config 对象
    c, err := opts.Config()
    if err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
    stopCh := make(chan struct{})
    // Get the completed config
    // 根据scheduler config 生成一个completed config
    cc := c.Complete()
    // 看看打开哪些feature
    algorithmprovider.ApplyFeatureGates()
    // 向componentconfig中注册配置文件 
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(cc.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    // 上面的一系列操作 就是为了获得一个completed config
    return Run(cc, stopCh)
}

这里的一系列操作 就是为了获得一个completed config, 然后给Run调用. 这里需要关注一个地方就是opts.Config().

// cmd/kube-scheduler/app/options/options.go

func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
    // 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿 
    if len(o.ConfigFile) == 0 {
        c.ComponentConfig = o.ComponentConfig

        // only apply deprecated flags if no config file is loaded (this is the old behaviour).
        if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
            return err
        }
        if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
            return err
        }
    } else {
        // 如果kube-scheduler 指定了--config 那就会从配置文件中取
        cfg, err := loadConfigFromFile(o.ConfigFile)
        if err != nil {
            return err
        }

        // use the loaded config file only, with the exception of --address and --port. This means that
        // none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
        // behaviour of the flags we have to keep.
        c.ComponentConfig = *cfg

        if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
            return err
        }
    }
    ...
    return nil
}

func (o *Options) Config() (*schedulerappconfig.Config, error) {
    if o.SecureServing != nil {
        if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
            return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
        }
    }

    c := &schedulerappconfig.Config{}
    if err := o.ApplyTo(c); err != nil {
        return nil, err
    }

    // Prepare kube clients.
    // 生成client 可以调用api-server
    client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
    if err != nil {
        return nil, err
    }
    ...
    // Set up leader election if enabled.
    var leaderElectionConfig *leaderelection.LeaderElectionConfig
    // 默认值就是true 只要用户不设置为false 这一步就会执行
    // 也就是说kube-scheduler 默认就是支持高可用
    if c.ComponentConfig.LeaderElection.LeaderElect {
        leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
        if err != nil {
            return nil, err
        }
    }

    c.Client = client
    c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
    c.PodInformer = factory.NewPodInformer(client, 0)
    c.EventClient = eventClient
    c.Recorder = recorder
    c.Broadcaster = eventBroadcaster
    c.LeaderElection = leaderElectionConfig

    return c, nil
}

ApplyTo: 主要是操作是否有配置文件, 如果有配置文件就会从配置文件中读取.
Config: 主要为了生成与api-server通信的client以及leaderElectionConfig用于支持kube-scheduler高可用.

接下来回到cmd/kube-scheduler/app/server.go中的runCommand, 然后进行Run(cc, stopCh)方法. 因为该Run是真正的核心方法, 所以这里我们主要分块分析, 先看看是如何生成pkg/scheduler/scheduler.go中的Scheduler对象.

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    // Create the scheduler.
    // 生成pkg/scheduler/scheduler.go 的Scheduler对象
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        storageClassInformer,
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
...
}

可以看到之前的一系列操作都是为了生成Scheduler所需要的配置. 包括了cc.Client, cc.ComponentConfig.AlgorithmSource等等.

3.2 pkg/scheduler/scheduler.go

可以看到整个Scheduler结构体就一个属性, 就是pkg/scheduler/factory/factory.go中的Config结构体.

type Scheduler struct {
    config *factory.Config
}

来看看New方法

func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    /**
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
     */
    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    // 生成factory的config-factory
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        EnableEquivalenceClassCache:    options.enableEquivalenceClassCache,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // 默认调度器会进入到这里 *source.Provider = DefaultProvider
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // 自定义调度器会进入到这里
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}

1. 根据传起来的opts方法生成options, 因为默认的属性是下面的几个, 如果需要改变, 就是通过opts方法中来改变.

var defaultSchedulerOptions = schedulerOptions{
    schedulerName:                  v1.DefaultSchedulerName,
    hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
    enableEquivalenceClassCache:    false,
    disablePreemption:              false,
    percentageOfNodesToScore:       schedulerapi.DefaultPercentageOfNodesToScore,
    bindTimeoutSeconds:             BindTimeoutSeconds,
}

2. 根据参数生成factoryconfigFactory对象名字为configurator, 这个后面部分会具体研究.
3. 根据source的不同来选择如何生成Scheduler对象的Config, 这里就讨论默认调度器的, 自定义调度器会有一篇专门博客介绍. 所以就是会进入sc, err := configurator.CreateFromProvider(*source.Provider)中生成所需的Config.

// pkg/scheduler/factory/factory.go

// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    provider, err := GetAlgorithmProvider(providerName)
    if err != nil {
        return nil, err
    }
    return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}

这里在[k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler已经介绍了GetAlgorithmProvider(providerName)其中providerName="DefaultProvider"获得了默认调度器的所有预选和优选方法的key.

然后根据configFactoryCreateFromKeys根据所有的预选方法和优选方法以及扩展方法(这里是空的)生成了Scheduler所需要的Config. 这里CreateFromKeys放到后面的configFactory一起说明.

4. 根据Config生成Scheduler对象sched.

// pkg/scheduler/scheduler.go

// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *factory.Config) *Scheduler {
    metrics.Register()
    return &Scheduler{
        config: config,
    }
}

3.3 pkg/scheduler/factory/factory.go

这里将分析3.2 pkg/scheduler/scheduler.go中提到的configurator := factory.NewConfigFactory部分.

3.3.1 configFactory

下面是关于configFactory结构体的定义

type configFactory struct {
    // 与api-server通信的客户端
    client clientset.Interface
    // queue for pods that need scheduling
    // 存着那些需要调度的pod
    podQueue internalqueue.SchedulingQueue
    // a means to list all known scheduled pods.
    // 可以获得所有已经调度的pod
    scheduledPodLister corelisters.PodLister
    // a means to list all known scheduled pods and pods assumed to have been scheduled.
    // 可以获得所有已经调度的pod和那些assumed pod
    podLister algorithm.PodLister
    // a means to list all nodes
    nodeLister corelisters.NodeLister
    // a means to list all PersistentVolumes
    pVLister corelisters.PersistentVolumeLister
    // a means to list all PersistentVolumeClaims
    pVCLister corelisters.PersistentVolumeClaimLister
    // a means to list all services
    serviceLister corelisters.ServiceLister
    // a means to list all controllers
    controllerLister corelisters.ReplicationControllerLister
    // a means to list all replicasets
    replicaSetLister appslisters.ReplicaSetLister
    // a means to list all statefulsets
    statefulSetLister appslisters.StatefulSetLister
    // a means to list all PodDisruptionBudgets
    pdbLister policylisters.PodDisruptionBudgetLister
    // a means to list all StorageClasses
    storageClassLister storagelisters.StorageClassLister
    // Close this to stop all reflectors
    StopEverything <-chan struct{}
    scheduledPodsHasSynced cache.InformerSynced
    schedulerCache schedulerinternalcache.Cache
    // SchedulerName of a scheduler is used to select which pods will be
    // processed by this scheduler, based on pods's "spec.schedulerName".
    // 调度器的名字 默认为default-scheduler
    schedulerName string
    // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
    // corresponding to every RequiredDuringScheduling affinity rule.
    // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
    hardPodAffinitySymmetricWeight int32
    // Equivalence class cache
    // 加速predicate阶段的equivalence class cache
    equivalencePodCache *equivalence.Cache
    // Enable equivalence class cache
    enableEquivalenceClassCache bool
    // Handles volume binding decisions
    volumeBinder *volumebinder.VolumeBinder
    // Always check all predicates even if the middle of one predicate fails.
    alwaysCheckAllPredicates bool
    // Disable pod preemption or not.
    // 是否禁止抢占
    disablePreemption bool
    // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
    percentageOfNodesToScore int32
}

NewFactory 方法

func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if args.StorageClassInformer != nil {
        storageClassLister = args.StorageClassInformer.Lister()
    }
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        enableEquivalenceClassCache:    args.EnableEquivalenceClassCache,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }

    c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
    // scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    ...
    c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
    ...
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, compareSignal)

    go func() {
        for {
            select {
            case <-c.StopEverything:
                c.podQueue.Close()
                return
            case <-ch:
                debugger.Comparer.Compare()
                debugger.Dumper.DumpAll()
            }
        }
    }()

    return c
}

这里主要需要注意几点就是:
1. schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)实例化了一个schedulerCache, 它的具体实现和结构在[k8s源码分析][kube-scheduler]scheduler/internal/cache之node_tree和cache已经分析过了, 这里主要看一下它在哪里会用到.
2. 可以看到configFactorypodListerschedulerCache用的是同一个schedulerCache对象. 因为podLister的定义就是可以获得所有已经调度的pod和那些assumed pod, 所以用schedulerCache很好理解.
3. configFactoryscheduledPodLister定义是可以获得所有已经调度的pod, args.PodInformer.Lister()可以得到所有的pod, 关于informerclient-go系列会有专门博客分析, 这里不细说, 很明显assignedPodLister就是在args.PodInformer.Lister()外面加了一层过滤那些已经被调度的pods.

c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
type assignedPodLister struct {
    corelisters.PodLister
}

// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
    list, err := l.PodLister.List(selector)
    if err != nil {
        return nil, err
    }
    filtered := make([]*v1.Pod, 0, len(list))
    for _, pod := range list {
        // 选择那些已经被调度过的
        if len(pod.Spec.NodeName) > 0 {
            filtered = append(filtered, pod)
        }
    }
    return filtered, nil
}

// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
    return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
}

4. podQueue的定义是存着那些需要调度的pod, 因此用的internalqueue.NewSchedulingQueue(stopEverything), 关于scheduling_queue[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(1)[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(2) 中有详细分析过.
5. 就是关于各种informers添加各种处理逻辑EventHandler, 包括podInformer, serviceInformer, NodeInformer, PvInformer, PvcInformer, StorageClassInformer等等, 该部分会在下一个主题分析.

3.2 Config
type Config struct {
    // It is expected that changes made via SchedulerCache will be observed
    // by NodeLister and Algorithm.
    // 一个schedulerCache 就是configFactory的schedulerCache
    SchedulerCache schedulerinternalcache.Cache
    // Ecache is used for optimistically invalid affected cache items after
    // successfully binding a pod
    // 就是configFactory的equivalencePodCache
    Ecache     *equivalence.Cache
    // 获得所有Node的Lister
    NodeLister algorithm.NodeLister
    // 用于调度的算法
    Algorithm  algorithm.ScheduleAlgorithm
    // Bind方法
    GetBinder  func(pod *v1.Pod) Binder
    // PodConditionUpdater is used only in case of scheduling errors. If we succeed
    // with scheduling, PodScheduled condition will be updated in apiserver in /bind
    // handler so that binding and setting PodCondition it is atomic.
    PodConditionUpdater PodConditionUpdater
    // PodPreemptor is used to evict pods and update pod annotations.
    // 抢占器
    PodPreemptor PodPreemptor
    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    // 取下一个需要调度的pod
    // 如果没有了, 则block一直等到有
    NextPod func() *v1.Pod
    // WaitForCacheSync waits for scheduler cache to populate.
    // It returns true if it was successful, false if the controller should shutdown.
    WaitForCacheSync func() bool
    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*v1.Pod, error)
    // Recorder is the EventRecorder to use
    Recorder record.EventRecorder
    // Close this to shut down the scheduler.
    StopEverything <-chan struct{}
    VolumeBinder *volumebinder.VolumeBinder
    DisablePreemption bool
    // cache需要被调度的pod
    SchedulingQueue internalqueue.SchedulingQueue
}

这里需要注意的是:
NextPod: 是一个方法, 所有的需要调度的pod都会存到这里, 然后一个一个出来进行调度.

接下来看看上面提到configFactoryCreateFromKeys, 该方法根据当前的configFactory根据提供的预选方法, 优选方法和扩展方法从而生成一个factory.go中的Config对象.

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
    klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

    if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
        return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
    }

    // 根据当前的预选key得到所有的预选方法
    predicateFuncs, err := c.GetPredicates(predicateKeys)
    if err != nil {
        return nil, err
    }

    // 根据当前的优选key得到所有的优选方法
    priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
    if err != nil {
        return nil, err
    }

    // priorityMetaProducer 在算分的时候会用到
    priorityMetaProducer, err := c.GetPriorityMetadataProducer()
    if err != nil {
        return nil, err
    }
    // predicateMetaProducer 在真正预选的时候会用到
    predicateMetaProducer, err := c.GetPredicateMetadataProducer()
    if err != nil {
        return nil, err
    }

    // 是否打开了加速predicate的equivalence class cache
    // Init equivalence class cache
    if c.enableEquivalenceClassCache {
        c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
        klog.Info("Created equivalence class cache")
    }

    // 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.equivalencePodCache,
        c.podQueue,
        predicateFuncs,
        predicateMetaProducer,
        priorityConfigs,
        priorityMetaProducer,
        extenders,
        c.volumeBinder,
        c.pVCLister,
        c.pdbLister,
        c.alwaysCheckAllPredicates,
        c.disablePreemption,
        c.percentageOfNodesToScore,
    )

    podBackoff := util.CreateDefaultPodBackoff()
    return &Config{
        SchedulerCache: c.schedulerCache,
        Ecache:         c.equivalencePodCache,
        // The scheduler only needs to consider schedulable nodes.
        NodeLister:          &nodeLister{c.nodeLister},
        Algorithm:           algo,
        GetBinder:           c.getBinderFunc(extenders),
        PodConditionUpdater: &podConditionUpdater{c.client},
        PodPreemptor:        &podPreemptor{c.client},
        WaitForCacheSync: func() bool {
            return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
        },
        NextPod: func() *v1.Pod {
            return c.getNextPod()
        },
        Error:           c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
        StopEverything:  c.StopEverything,
        VolumeBinder:    c.volumeBinder,
        SchedulingQueue: c.podQueue,
    }, nil
}

这里需要注意的是:
1. 根据预选, 优选key得到其对应的预选和优选方法. 并得到注册的priorityMetaProducerpredicateMetaProducer.
2. 生成真正进行调度计算的algorithm.ScheduleAlgorithm接口类, 返回一个它的实现类genericScheduler(pkg/scheduler/core/generic_scheduler.go)对象.

// pkg/scheduler/algorithm/scheduler_interface.go

type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
    // Preempt receives scheduling errors for a pod and tries to create room for
    // the pod by preempting lower priority pods if possible.
    // It returns the node where preemption happened, a list of preempted pods, a
    // list of pods whose nominated node name should be removed, and error if any.
    Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    // Predicates() returns a pointer to a map of predicate functions. This is
    // exposed for testing.
    Predicates() map[string]FitPredicate
    // Prioritizers returns a slice of priority config. This is exposed for
    // testing.
    Prioritizers() []PriorityConfig
}

// pkg/scheduler/core/generic_scheduler.go 

func NewGenericScheduler(
    cache schedulerinternalcache.Cache,
    eCache *equivalence.Cache,
    podQueue internalqueue.SchedulingQueue,
    predicates map[string]algorithm.FitPredicate,
    predicateMetaProducer algorithm.PredicateMetadataProducer,
    prioritizers []algorithm.PriorityConfig,
    priorityMetaProducer algorithm.PriorityMetadataProducer,
    extenders []algorithm.SchedulerExtender,
    volumeBinder *volumebinder.VolumeBinder,
    pvcLister corelisters.PersistentVolumeClaimLister,
    pdbLister algorithm.PDBLister,
    alwaysCheckAllPredicates bool,
    disablePreemption bool,
    percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
    return &genericScheduler{
        cache:                    cache,
        equivalenceCache:         eCache,
        schedulingQueue:          podQueue,
        predicates:               predicates,
        predicateMetaProducer:    predicateMetaProducer,
        prioritizers:             prioritizers,
        priorityMetaProducer:     priorityMetaProducer,
        extenders:                extenders,
        cachedNodeInfoMap:        make(map[string]*schedulercache.NodeInfo),
        volumeBinder:             volumeBinder,
        pvcLister:                pvcLister,
        pdbLister:                pdbLister,
        alwaysCheckAllPredicates: alwaysCheckAllPredicates,
        disablePreemption:        disablePreemption,
        percentageOfNodesToScore: percentageOfNodesToScore,
    }
}

3. 生成GetBinder, getBinderFunc返回一个对该pod支持的extender绑定器或者默认绑定器.

// pkg/scheduler/factory/factory.go 

func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
    var extenderBinder algorithm.SchedulerExtender
    for i := range extenders {
        if extenders[i].IsBinder() {
            extenderBinder = extenders[i]
            break
        }
    }
    defaultBinder := &binder{c.client}
    return func(pod *v1.Pod) Binder {
        if extenderBinder != nil && extenderBinder.IsInterested(pod) {
            return extenderBinder
        }
        return defaultBinder
    }
}

4. 生成PodConditionUpdaterPodPreemptor, 都是与api-server通信的客户端(client).
5. NextPod这里最核心的一个函数, 因为所有需要调度的pod都是从这里出来的.

NextPod: func() *v1.Pod {
            return c.getNextPod()
        }
func (c *configFactory) getNextPod() *v1.Pod {
    pod, err := c.podQueue.Pop()
    if err == nil {
        klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
        return pod
    }
    klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
    return nil
}

可以看到所有的pod都是从podQueue中出来的, 所以对于pod是在哪里进入podQueue就比较重要了, 这里就会涉及了上面说的各种informers, 所以放到下一篇博客说明.

3.4 返回到Run

3.3 中分析了3.1cmd/kube-scheduler/app/server.goRun方法中是如何生成pkg/scheduler/scheduler.go中的Scheduler对象的. 那么现在该对象创建完了会怎么样呢?所以需要回到cmd/kube-scheduler/app/server.goRun方法中.

// cmd/kube-scheduler/app/server.go

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    // Create the scheduler.
    // 生成pkg/scheduler/scheduler.go 的Scheduler对象
    sched, err := scheduler.New
    ...
    // Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    // 启动高可用
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            // 调用run方法
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

1. 启动了所有informers.
2. 因为默认是支持高可用的, 所以会以高可用的方式启动sched.Run()方法.

接下来看看sched.Run方法.

func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    ...
}

可以看到每隔0秒执行scheduleOne方法, 而schedulerOne方法中就是调用sched.config.NextPod()从它的podQueuepop出一个pod进行调度.

说白了就是不断从podQueue中出一个pod进行调度, 如果podQueue中没有, 就block在这里.

4. 总结

分析完整个过程, 可以看到
1. 解析文件或者根据默认配置生成一个completed config.
2. 启动跟pod有关的informers监控集群中的变化并按照相关规则进入到一个scheduling_queue, 也就是podQueue.
3. 启动无限制循环一直读podQueue来进行调度.

上一篇下一篇

猜你喜欢

热点阅读