k8s 之 scheduler 源码简单分析

2020-07-11  本文已影响0人  wwq2020

简介

kubernetes Scheduler 负责从 apiserver 获取 PodSpec.NodeName 为空的 pod,为他创建一个 binding 指示 pod 应该调度到哪个节点上.

cmd/kube-scheduler/scheduler.go 中

func main() {
   ...
    command := app.NewSchedulerCommand()
   ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

cmd/kube-scheduler/app/server.go 中

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, opts, registryOptions...); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
        Args: func(cmd *cobra.Command, args []string) error {
            for _, arg := range args {
                if len(arg) > 0 {
                    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                }
            }
            return nil
        },
    }
   ...
}

func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    verflag.PrintAndExitIfRequested()
    cliflag.PrintFlags(cmd.Flags())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    cc, sched, err := Setup(ctx, opts, registryOptions...)
    if err != nil {
        return err
    }

    if len(opts.WriteConfigTo) > 0 {
        if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
            return err
        }
        klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
        return nil
    }

    return Run(ctx, cc, sched)
}

func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    // To help debugging, immediately log version
    klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

    // Configz registration.
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(cc.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    // Prepare the event broadcaster.
    cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

    // Setup healthz checks.
    var checks []healthz.HealthChecker
    if cc.ComponentConfig.LeaderElection.LeaderElect {
        checks = append(checks, cc.LeaderElection.WatchDog)
    }

    // Start up the healthz server.
    if cc.InsecureServing != nil {
        separateMetrics := cc.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
        if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    if cc.InsecureMetricsServing != nil {
        handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
        if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
            return fmt.Errorf("failed to start metrics server: %v", err)
        }
    }
    if cc.SecureServing != nil {
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
        // TODO: handle stoppedCh returned by c.SecureServing.Serve
        if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
            // fail early for secure handlers, removing the old error loop from above
            return fmt.Errorf("failed to start secure server: %v", err)
        }
    }

    // Start all informers.
    go cc.PodInformer.Informer().Run(ctx.Done())
    cc.InformerFactory.Start(ctx.Done())

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(ctx.Done())

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: sched.Run,
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    sched.Run(ctx)
    return fmt.Errorf("finished without leader elect")
}

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
    if errs := opts.Validate(); len(errs) > 0 {
        return nil, nil, utilerrors.NewAggregate(errs)
    }

    c, err := opts.Config()
    if err != nil {
        return nil, nil, err
    }

    // Get the completed config
    cc := c.Complete()

    outOfTreeRegistry := make(runtime.Registry)
    for _, option := range outOfTreeRegistryOptions {
        if err := option(outOfTreeRegistry); err != nil {
            return nil, nil, err
        }
    }

    recorderFactory := getRecorderFactory(&cc)
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory,
        cc.PodInformer,
        recorderFactory,
        ctx.Done(),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
        scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
    )
    if err != nil {
        return nil, nil, err
    }

    return &cc, sched, nil
}

pkg/scheduler/scheduler.go 中

func (sched *Scheduler) Run(ctx context.Context) {
    if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
        return
    }
    sched.SchedulingQueue.Run()
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    sched.SchedulingQueue.Close()
}

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    prof, err := sched.profileForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.Error(err)
        return
    }
    if sched.skipPodSchedule(prof, pod) {
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    state := framework.NewCycleState()
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
    if err != nil {
        // Schedule() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        nominatedNode := ""
        if fitError, ok := err.(*core.FitError); ok {
            if sched.DisablePreemption || !prof.HasPostFilterPlugins() {
                klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                    " No preemption is performed.")
            } else {
                // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
                result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
                if status.Code() == framework.Error {
                    klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
                } else {
                    klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
                }
                if status.IsSuccess() && result != nil {
                    nominatedNode = result.NominatedNodeName
                }
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
        } else if err == core.ErrNoNodesAvailable {
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
        } else {
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
        }
        sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
        return
    }
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    assumedPodInfo := podInfo.DeepCopy()
    assumedPod := assumedPodInfo.Pod

    // Run the Reserve method of reserve plugins.
    if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
        metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
        // trigger un-reserve to clean up state associated with the reserved Pod
        prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        return
    }

    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        // This is most probably result of a BUG in retrying logic.
        // We report an error here so that pod scheduling can be retried.
        // This relies on the fact that Error will check if the pod has been bound
        // to a node and if so will not add it back to the unscheduled pods queue
        // (otherwise this would cause an infinite loop).
        sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
        metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
        // trigger un-reserve plugins to clean up state associated with the reserved Pod
        prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        return
    }

    // Run "permit" plugins.
    runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
        var reason string
        if runPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
            klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
        }
        // One of the plugins returned status different than success or wait.
        prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
        return
    }

    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
        defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()

        waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
        if !waitOnPermitStatus.IsSuccess() {
            var reason string
            if waitOnPermitStatus.IsUnschedulable() {
                metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
                reason = v1.PodReasonUnschedulable
            } else {
                metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
                reason = SchedulerError
            }
            if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
                klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
            }
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
            return
        }

        // Run "prebind" plugins.
        preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            var reason string
            metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
            reason = SchedulerError
            if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
                klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
            }
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "")
            return
        }

        err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
        if err != nil {
            metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
        } else {
            // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
            if klog.V(2).Enabled() {
                klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
            }

            metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start))
            metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
            metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

            // Run "postbind" plugins.
            prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        }
    }()
}

pkg/scheduler/scheduler.go 中

func New(client clientset.Interface,
    informerFactory informers.SharedInformerFactory,
    podInformer coreinformers.PodInformer,
    recorderFactory profile.RecorderFactory,
    stopCh <-chan struct{},
    opts ...Option) (*Scheduler, error) {
   ...
    var sched *Scheduler
    source := options.schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.createFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        sched = sc
   ...
   }
   ...
}
   func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
    klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    r := algorithmprovider.NewRegistry()
    defaultPlugins, exist := r[providerName]
    if !exist {
        return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
    }

    for i := range c.profiles {
        prof := &c.profiles[i]
        plugins := &schedulerapi.Plugins{}
        plugins.Append(defaultPlugins)
        plugins.Apply(prof.Plugins)
        prof.Plugins = plugins
    }
    return c.create()
}

pkg/scheduler/factory.go 中

func (c *Configurator) create() (*Scheduler, error) {
  ...
    profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory,
        frameworkruntime.WithPodNominator(nominator))
    if err != nil {
        return nil, fmt.Errorf("initializing profiles: %v", err)
    }
   ...
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.nodeInfoSnapshot,
        extenders,
        c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
        c.disablePreemption,
        c.percentageOfNodesToScore,
    )
   ...
    return &Scheduler{
        SchedulerCache:  c.schedulerCache,
        Algorithm:       algo,
        Profiles:        profiles,
        NextPod:         internalqueue.MakeNextPodFunc(podQueue),
        Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
        StopEverything:  c.StopEverything,
        SchedulingQueue: podQueue,
    }, nil
}
}

func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile, opts ...frameworkruntime.Option) (framework.Framework, error) {
    if c.frameworkCapturer != nil {
        c.frameworkCapturer(p)
    }
    opts = append([]frameworkruntime.Option{
        frameworkruntime.WithClientSet(c.client),
        frameworkruntime.WithInformerFactory(c.informerFactory),
        frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot),
        frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates),
    }, opts...)
    return frameworkruntime.NewFramework(
        c.registry,
        p.Plugins,
        p.PluginConfig,
        opts...,
    )
}

pkg/scheduler/core/generic_scheduler.go 中

func NewGenericScheduler(
    cache internalcache.Cache,
    nodeInfoSnapshot *internalcache.Snapshot,
    extenders []framework.Extender,
    pvcLister corelisters.PersistentVolumeClaimLister,
    disablePreemption bool,
    percentageOfNodesToScore int32) ScheduleAlgorithm {
    return &genericScheduler{
        cache:                    cache,
        extenders:                extenders,
        nodeInfoSnapshot:         nodeInfoSnapshot,
        pvcLister:                pvcLister,
        disablePreemption:        disablePreemption,
        percentageOfNodesToScore: percentageOfNodesToScore,
    }
}

func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
        return result, err
    }
    trace.Step("Basic checks done")

    if err := g.snapshot(); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if g.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }

    startPredicateEvalTime := time.Now()
    filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
    if err != nil {
        return result, err
    }
    trace.Step("Computing predicates done")

    if len(filteredNodes) == 0 {
        return result, &FitError{
            Pod:                   pod,
            NumAllNodes:           g.nodeInfoSnapshot.NumNodes(),
            FilteredNodesStatuses: filteredNodesStatuses,
        }
    }

    metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
    metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

    startPriorityEvalTime := time.Now()
    // When only one node after predicate, just use it.
    if len(filteredNodes) == 1 {
        metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
        return ScheduleResult{
            SuggestedHost:  filteredNodes[0].Name,
            EvaluatedNodes: 1 + len(filteredNodesStatuses),
            FeasibleNodes:  1,
        }, nil
    }

    priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
    if err != nil {
        return result, err
    }

    metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
    metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))

    host, err := g.selectHost(priorityList)
    trace.Step("Prioritizing done")

    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

pkg/scheduler/eventhandlers.go 中

func addAllEventHandlers(
    sched *Scheduler,
    informerFactory informers.SharedInformerFactory,
    podInformer coreinformers.PodInformer,
) {
    // scheduled pod cache
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToCache,
                UpdateFunc: sched.updatePodInCache,
                DeleteFunc: sched.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToSchedulingQueue,
                UpdateFunc: sched.updatePodInSchedulingQueue,
                DeleteFunc: sched.deletePodFromSchedulingQueue,
            },
        },
    )

    informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.addNodeToCache,
            UpdateFunc: sched.updateNodeInCache,
            DeleteFunc: sched.deleteNodeFromCache,
        },
    )

    if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
        informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.onCSINodeAdd,
                UpdateFunc: sched.onCSINodeUpdate,
            },
        )
    }

    // On add and delete of PVs, it will affect equivalence cache items
    // related to persistent volume
    informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
            AddFunc:    sched.onPvAdd,
            UpdateFunc: sched.onPvUpdate,
        },
    )

    // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
    informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.onPvcAdd,
            UpdateFunc: sched.onPvcUpdate,
        },
    )

    // This is for ServiceAffinity: affected by the selector of the service is updated.
    // Also, if new service is added, equivalence cache will also become invalid since
    // existing pods may be "captured" by this service and change this predicate result.
    informerFactory.Core().V1().Services().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    sched.onServiceAdd,
            UpdateFunc: sched.onServiceUpdate,
            DeleteFunc: sched.onServiceDelete,
        },
    )

    informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: sched.onStorageClassAdd,
        },
    )
}

pkg/scheduler/profile/profile.go中

func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory,
    opts ...frameworkruntime.Option) (Map, error) {
    m := make(Map)
    v := cfgValidator{m: m}

    for _, cfg := range cfgs {
        if err := v.validate(cfg); err != nil {
            return nil, err
        }
        p, err := NewProfile(cfg, frameworkFact, recorderFact, opts...)
        if err != nil {
            return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
        }
        m[cfg.SchedulerName] = p
    }
    return m, nil
}

func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory,
    opts ...frameworkruntime.Option) (*Profile, error) {
    recorder := recorderFact(cfg.SchedulerName)
    opts = append(opts, frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithProfileName(cfg.SchedulerName))
    fwk, err := frameworkFact(cfg, opts...)
    if err != nil {
        return nil, err
    }
    return &Profile{
        Name:      cfg.SchedulerName,
        Framework: fwk,
        Recorder:  recorder,
    }, nil
}

pkg/scheduler/framework/runtime/framework.go中


func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
    return []extensionPoint{
        {plugins.PreFilter, &f.preFilterPlugins},
        {plugins.Filter, &f.filterPlugins},
        {plugins.PostFilter, &f.postFilterPlugins},
        {plugins.Reserve, &f.reservePlugins},
        {plugins.PreScore, &f.preScorePlugins},
        {plugins.Score, &f.scorePlugins},
        {plugins.PreBind, &f.preBindPlugins},
        {plugins.Bind, &f.bindPlugins},
        {plugins.PostBind, &f.postBindPlugins},
        {plugins.Permit, &f.permitPlugins},
        {plugins.QueueSort, &f.queueSortPlugins},
    }
}

func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig, opts ...Option) (framework.Framework, error) {

    pluginsMap := make(map[string]framework.Plugin)
    var totalPriority int64
    for name, factory := range r {
        // initialize only needed plugins.
        if _, ok := pg[name]; !ok {
            continue
        }

        args, err := getPluginArgsOrDefault(pluginConfig, name)
        if err != nil {
            return nil, fmt.Errorf("getting args for Plugin %q: %w", name, err)
        }
        p, err := factory(args, f)
        if err != nil {
            return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
        }
        pluginsMap[name] = p

        // a weight of zero is not permitted, plugins can be disabled explicitly
        // when configured.
        f.pluginNameToWeightMap[name] = int(pg[name].Weight)
        if f.pluginNameToWeightMap[name] == 0 {
            f.pluginNameToWeightMap[name] = 1
        }
        // Checks totalPriority against MaxTotalScore to avoid overflow
        if int64(f.pluginNameToWeightMap[name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
            return nil, fmt.Errorf("total score of Score plugins could overflow")
        }
        totalPriority += int64(f.pluginNameToWeightMap[name]) * framework.MaxNodeScore
    }
  ...
    for _, e := range f.getExtensionPoints(plugins) {
        if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil {
            return nil, err
        }
    }
...
}


func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]framework.Plugin) error {
    if pluginSet == nil {
        return nil
    }

    plugins := reflect.ValueOf(pluginList).Elem()
    pluginType := plugins.Type().Elem()
    set := sets.NewString()
    for _, ep := range pluginSet.Enabled {
        pg, ok := pluginsMap[ep.Name]
        if !ok {
            return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
        }

        if !reflect.TypeOf(pg).Implements(pluginType) {
            return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
        }

        if set.Has(ep.Name) {
            return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
        }

        set.Insert(ep.Name)

        newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
        plugins.Set(newPlugins)
    }
    return nil
}
上一篇 下一篇

猜你喜欢

热点阅读