kubernetes源码阅读——scheduler

2022-02-14  本文已影响0人  睡不醒的大橘
// cmd\kube-scheduler\scheduler.go
func main() {
    rand.Seed(time.Now().UnixNano())

    pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)

    command := app.NewSchedulerCommand()

    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}
// cmd\kube-scheduler\app\server.go
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, opts, registryOptions...); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
        Args: func(cmd *cobra.Command, args []string) error {
            for _, arg := range args {
                if len(arg) > 0 {
                    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                }
            }
            return nil
        },
    }
    fs := cmd.Flags()
    namedFlagSets := opts.Flags()
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
    for _, f := range namedFlagSets.FlagSets {
        fs.AddFlagSet(f)
    }

    usageFmt := "Usage:\n  %s\n"
    cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
    cmd.SetUsageFunc(func(cmd *cobra.Command) error {
        fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
        cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
        return nil
    })
    cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
        fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
        cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    })
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}
// cmd\kube-scheduler\app\server.go
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    verflag.PrintAndExitIfRequested()
    cliflag.PrintFlags(cmd.Flags())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    cc, sched, err := Setup(ctx, opts, registryOptions...)
    if err != nil {
        return err
    }

    return Run(ctx, cc, sched)
}

// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
    if errs := opts.Validate(); len(errs) > 0 {
        return nil, nil, utilerrors.NewAggregate(errs)
    }

    c, err := opts.Config()
    if err != nil {
        return nil, nil, err
    }

    // Get the completed config
    cc := c.Complete()

    outOfTreeRegistry := make(runtime.Registry)
    for _, option := range outOfTreeRegistryOptions {
        if err := option(outOfTreeRegistry); err != nil {
            return nil, nil, err
        }
    }

    recorderFactory := getRecorderFactory(&cc)
    completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory,
        recorderFactory,
        ctx.Done(),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
        scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
        scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
        scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
            // Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
            completedProfiles = append(completedProfiles, profile)
        }),
    )
    if err != nil {
        return nil, nil, err
    }
    if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
        return nil, nil, err
    }

    return &cc, sched, nil
}
// pkg\scheduler\scheduler.go
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
    // It is expected that changes made via SchedulerCache will be observed
    // by NodeLister and Algorithm.
    SchedulerCache internalcache.Cache

    // 调度算法
    Algorithm core.ScheduleAlgorithm

    // 获取下一个需要调度的Pod
    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    NextPod func() *framework.QueuedPodInfo

    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*framework.QueuedPodInfo, error)

    // Close this to shut down the scheduler.
    StopEverything <-chan struct{}

    // SchedulingQueue holds pods to be scheduled
    SchedulingQueue internalqueue.SchedulingQueue

    // Profiles are the scheduling profiles.
    Profiles profile.Map

    // apiserver的client
    client clientset.Interface
}


// New returns a Scheduler
func New(client clientset.Interface,
    informerFactory informers.SharedInformerFactory,
    recorderFactory profile.RecorderFactory,
    stopCh <-chan struct{},
    opts ...Option) (*Scheduler, error) {

    stopEverything := stopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }

    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    schedulerCache := internalcache.New(30*time.Second, stopEverything)

    registry := frameworkplugins.NewInTreeRegistry()
    if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
        return nil, err
    }

    snapshot := internalcache.NewEmptySnapshot()

    configurator := &Configurator{
        client:                   client,
        recorderFactory:          recorderFactory,
        informerFactory:          informerFactory,
        schedulerCache:           schedulerCache,
        StopEverything:           stopEverything,
        percentageOfNodesToScore: options.percentageOfNodesToScore,
        podInitialBackoffSeconds: options.podInitialBackoffSeconds,
        podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
        profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
        registry:                 registry,
        nodeInfoSnapshot:         snapshot,
        extenders:                options.extenders,
        frameworkCapturer:        options.frameworkCapturer,
        parallellism:             options.parallelism,
    }

    metrics.Register()

    var sched *Scheduler
    source := options.schedulerAlgorithmSource
    switch {
    // 基于Provider的AlgorithmSource。通过名称的方式实例化调度算法函数,是kube-scheduler的默认方式。
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.createFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        sched = sc
    // 基于Policy的AlgorithmSource。即通过定义好的Policy资源的方式实例化调度算法函数。
    //可通过--policy-config-file参数指定调度策略文件。
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        // Set extenders on the configurator now that we've decoded the policy
        // In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
        // which would have set extenders in the above instantiation of Configurator from CC options)
        configurator.extenders = policy.Extenders
        sc, err := configurator.createFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        sched = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    sched.StopEverything = stopEverything
    sched.client = client

    // 注册事件处理函数
    addAllEventHandlers(sched, informerFactory)
    return sched, nil
}
// pkg\scheduler\eventhandlers.go
// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func addAllEventHandlers(
    sched *Scheduler,
    informerFactory informers.SharedInformerFactory,
) {
    // scheduled pod cache
    informerFactory.Core().V1().Pods().Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToCache,
                UpdateFunc: sched.updatePodInCache,
                DeleteFunc: sched.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    informerFactory.Core().V1().Pods().Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToSchedulingQueue,
                UpdateFunc: sched.updatePodInSchedulingQueue,
                DeleteFunc: sched.deletePodFromSchedulingQueue,
            },
        },
    )

    informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.addNodeToCache,
            UpdateFunc: sched.updateNodeInCache,
            DeleteFunc: sched.deleteNodeFromCache,
        },
    )

    informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.onCSINodeAdd,
            UpdateFunc: sched.onCSINodeUpdate,
        },
    )

    // On add and update of PVs.
    informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
            AddFunc:    sched.onPvAdd,
            UpdateFunc: sched.onPvUpdate,
        },
    )

    // This is for MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
    informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.onPvcAdd,
            UpdateFunc: sched.onPvcUpdate,
        },
    )

    // This is for ServiceAffinity: affected by the selector of the service is updated.
    // Also, if new service is added, equivalence cache will also become invalid since
    // existing pods may be "captured" by this service and change this predicate result.
    informerFactory.Core().V1().Services().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.onServiceAdd,
            UpdateFunc: sched.onServiceUpdate,
            DeleteFunc: sched.onServiceDelete,
        },
    )

    informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: sched.onStorageClassAdd,
        },
    )
}
// cmd\kube-scheduler\app\server.go
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    // To help debugging, immediately log version
    klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

    // Configz registration.
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(cc.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    // Prepare the event broadcaster.
    cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

    // Setup healthz checks.
    var checks []healthz.HealthChecker
    if cc.ComponentConfig.LeaderElection.LeaderElect {
        checks = append(checks, cc.LeaderElection.WatchDog)
    }

    waitingForLeader := make(chan struct{})
    isLeader := func() bool {
        select {
        case _, ok := <-waitingForLeader:
            // if channel is closed, we are leading
            return !ok
        default:
            // channel is open, we are waiting for a leader
            return false
        }
    }

    // 启动http server,提供健康检查和性能监控的API
    // Start up the healthz server. 
    if cc.InsecureServing != nil {
        separateMetrics := cc.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, separateMetrics, checks...), nil, nil)
        if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    if cc.InsecureMetricsServing != nil {
        handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)
        if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
            return fmt.Errorf("failed to start metrics server: %v", err)
        }
    }
    if cc.SecureServing != nil {
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
        // TODO: handle stoppedCh returned by c.SecureServing.Serve
        if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
            // fail early for secure handlers, removing the old error loop from above
            return fmt.Errorf("failed to start secure server: %v", err)
        }
    }

    // Start all informers.
    cc.InformerFactory.Start(ctx.Done())

    // 等待所有运行中的Informer的数据同步,使本地缓存数据与Etcd集群中的数据保持一致。
    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(ctx.Done())


    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    // 若当前节点领导者选举成功会回调OnStartedLeading函数,运行scheduler;
    // 若当前节点领导者选举被抢占,将退出scheduler进程
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                close(waitingForLeader)
                sched.Run(ctx)
            },
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    close(waitingForLeader)
    // 运行scheduler
    sched.Run(ctx)
    return fmt.Errorf("finished without leader elect")
}
// pkg\scheduler\scheduler.go
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    sched.SchedulingQueue.Close()
}

// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // NextPod会阻塞直到出现pod,从优先级队列中获取一个优先级最高的待调度Pod资源对象
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }

    klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    state := framework.NewCycleState()
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    // 使用调度算法,为Pod资源对象选择一个合适的节点
    scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
    if err != nil {
        // Schedule() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        nominatedNode := ""
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {
                // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", klog.KObj(pod), "status", status)
                } else {
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if status.IsSuccess() && result != nil {
                    nominatedNode = result.NominatedNodeName
                }
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == core.ErrNoNodesAvailable {
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
        return
    }
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    assumedPodInfo := podInfo.DeepCopy()
    assumedPod := assumedPodInfo.Pod
    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        // This is most probably result of a BUG in retrying logic.
        // We report an error here so that pod scheduling can be retried.
        // This relies on the fact that Error will check if the pod has been bound
        // to a node and if so will not add it back to the unscheduled pods queue
        // (otherwise this would cause an infinite loop).
        sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
        return
    }

    // Run the Reserve method of reserve plugins.
    if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        // trigger un-reserve to clean up state associated with the reserved Pod
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
        }
        sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
        return
    }

    // Run "permit" plugins.
    runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
        var reason string
        if runPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        // One of the plugins returned status different than success or wait.
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
        }
        sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
        return
    }

    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
        defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()

        waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
        if !waitOnPermitStatus.IsSuccess() {
            var reason string
            if waitOnPermitStatus.IsUnschedulable() {
                metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = v1.PodReasonUnschedulable
            } else {
                metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = SchedulerError
            }
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            }
            sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
            return
        }

        // Run "prebind" plugins.
        preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            }
            sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
            return
        }

        err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
        if err != nil {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
                klog.ErrorS(err, "scheduler cache ForgetPod failed")
            }
            sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
        } else {
            // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
            if klog.V(2).Enabled() {
                klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
            }
            metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
            metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
            metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

            // Run "postbind" plugins.
            fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        }
    }()
}
// pkg\scheduler\core\generic_scheduler.go
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
    Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
    // Extenders returns a slice of extender config. This is exposed for
    // testing.
    Extenders() []framework.Extender
}
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := g.snapshot(); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if g.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }

    //通过预选调度算法得到一定的候选node
    feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
    if err != nil {
        return result, err
    }
    trace.Step("Computing predicates done")

    if len(feasibleNodes) == 0 {
        return result, &framework.FitError{
            Pod:         pod,
            NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
            Diagnosis:   diagnosis,
        }
    }

    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  feasibleNodes[0].Name,
            EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
            FeasibleNodes:  1,
        }, nil
    }

   //通过优选调度算法为每个node打分
    priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
    if err != nil {
        return result, err
    }
    //选出打分最高的节点
    host, err := g.selectHost(priorityList)
    trace.Step("Prioritizing done")

    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
        FeasibleNodes:  len(feasibleNodes),
    }, err
}

// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    diagnosis := framework.Diagnosis{
        NodeToStatusMap:      make(framework.NodeToStatusMap),
        UnschedulablePlugins: sets.NewString(),
    }

    // Run "prefilter" plugins.
    s := fwk.RunPreFilterPlugins(ctx, state, pod)
    allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
    if err != nil {
        return nil, diagnosis, err
    }
    if !s.IsSuccess() {
        if !s.IsUnschedulable() {
            return nil, diagnosis, s.AsError()
        }
        // All nodes will have the same status. Some non trivial refactoring is
        // needed to avoid this copy.
        for _, n := range allNodes {
            diagnosis.NodeToStatusMap[n.Node().Name] = s
        }
        // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
        diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
        return nil, diagnosis, nil
    }

    // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
    // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
    if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
        feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
        if err != nil {
            klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
        }
        // Nominated node passes all the filters, scheduler is good to assign this node to the pod.
        if len(feasibleNodes) != 0 {
            return feasibleNodes, diagnosis, nil
        }
    }
    feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(
    ctx context.Context,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    diagnosis framework.Diagnosis,
    nodes []*framework.NodeInfo) ([]*v1.Node, error) {
    numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))

    // Create feasible list with enough space to avoid growing it
    // and allow assigning.
    feasibleNodes := make([]*v1.Node, numNodesToFind)

    if !fwk.HasFilterPlugins() {
        length := len(nodes)
        for i := range feasibleNodes {
            feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
        }
        g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
        return feasibleNodes, nil
    }

    errCh := parallelize.NewErrorChannel()
    var statusesLock sync.Mutex
    var feasibleNodesLen int32
    ctx, cancel := context.WithCancel(ctx)
    checkNode := func(i int) {
        // We check the nodes starting from where we left off in the previous scheduling cycle,
        // this is to make sure all nodes have the same chance of being examined across pods.
        nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
        status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
        if status.Code() == framework.Error {
            errCh.SendErrorWithCancel(status.AsError(), cancel)
            return
        }
        if status.IsSuccess() {
            length := atomic.AddInt32(&feasibleNodesLen, 1)
            if length > numNodesToFind {
                cancel()
                atomic.AddInt32(&feasibleNodesLen, -1)
            } else {
                feasibleNodes[length-1] = nodeInfo.Node()
            }
        } else {
            statusesLock.Lock()
            diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
            diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
            statusesLock.Unlock()
        }
    }

    beginCheckNode := time.Now()
    statusCode := framework.Success
    defer func() {
        // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
        // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
        // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
        metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
    }()

    // Stops searching for more nodes once the configured number of feasible nodes
    // are found.
    fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
    processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
    g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)

    feasibleNodes = feasibleNodes[:feasibleNodesLen]
    if err := errCh.ReceiveError(); err != nil {
        statusCode = framework.Error
        return nil, err
    }
    return feasibleNodes, nil
}

// prioritizeNodes prioritizes the nodes by running the score plugins,
// which return a score for each node from the call to RunScorePlugins().
// The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes(
    ctx context.Context,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) (framework.NodeScoreList, error) {
    // If no priority configs are provided, then all nodes will have a score of one.
    // This is required to generate the priority list in the required format
    if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
        result := make(framework.NodeScoreList, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodeScore{
                Name:  nodes[i].Name,
                Score: 1,
            })
        }
        return result, nil
    }

    // Run PreScore plugins.
    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    if !preScoreStatus.IsSuccess() {
        return nil, preScoreStatus.AsError()
    }

    // Run the Score plugins.
    scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return nil, scoreStatus.AsError()
    }

    if klog.V(10).Enabled() {
        for plugin, nodeScoreList := range scoresMap {
            for _, nodeScore := range nodeScoreList {
                klog.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
            }
        }
    }

    // Summarize all scores.
    result := make(framework.NodeScoreList, 0, len(nodes))

    for i := range nodes {
        result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

    if len(g.extenders) != 0 && nodes != nil {
        var mu sync.Mutex
        var wg sync.WaitGroup
        combinedScores := make(map[string]int64, len(nodes))
        for i := range g.extenders {
            if !g.extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
                defer func() {
                    metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
                    wg.Done()
                }()
                prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klog.V(10).Enabled() {
                        klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", g.extenders[extIndex].Name(), "node", host, "score", score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
            // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
            result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
        }
    }

    if klog.V(10).Enabled() {
        for i := range result {
            klog.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
        }
    }
    return result, nil
}

// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
    if len(nodeScoreList) == 0 {
        return "", fmt.Errorf("empty priorityList")
    }
    maxScore := nodeScoreList[0].Score
    selected := nodeScoreList[0].Name
    cntOfMaxScore := 1
    for _, ns := range nodeScoreList[1:] {
        if ns.Score > maxScore {
            maxScore = ns.Score
            selected = ns.Name
            cntOfMaxScore = 1
        } else if ns.Score == maxScore {
            cntOfMaxScore++
            if rand.Intn(cntOfMaxScore) == 0 {
                // Replace the candidate with probability of 1/cntOfMaxScore
                selected = ns.Name
            }
        }
    }
    return selected, nil
}
上一篇下一篇

猜你喜欢

热点阅读