kube-scheduler 源码笔记

2020-07-01  本文已影响0人  一叶舟遥

kube-scheduler设计

kube-scheduler是以插件形式存在的组件,正因为以插件形式存在,所以其具有可扩展可定制的特性。kube-scheduler相当于整个集群的调度决策者,其通过预选和优选两个过程决定pod的最佳调度位置。

For given pod:

    +---------------------------------------------+
    |               Schedulable nodes:            |
    |                                             |
    | +--------+    +--------+      +--------+    |
    | | node 1 |    | node 2 |      | node 3 |    |
    | +--------+    +--------+      +--------+    |
    |                                             |
    +-------------------+-------------------------+
                        |
                        |
                        v
    +-------------------+-------------------------+

    Pred. filters: node 3 doesn't have enough resource

    +-------------------+-------------------------+
                        |
                        |
                        v
    +-------------------+-------------------------+
    |             remaining nodes:                |
    |   +--------+                 +--------+     |
    |   | node 1 |                 | node 2 |     |
    |   +--------+                 +--------+     |
    |                                             |
    +-------------------+-------------------------+
                        |
                        |
                        v
    +-------------------+-------------------------+

    Priority function:    node 1: p=2
                          node 2: p=5

    +-------------------+-------------------------+
                        |
                        |
                        v
            select max{node priority} = node 2

kube-scheduler的目的就是为每一个pod选择一个合适的node,整体流程可以概括为三步:

  1. 首先通过Predicates策略过滤掉不能满足资源需求的node;
  2. 利用Priorities策略对筛选出的node进行打分排序;
  3. 将pod调度到得分最高的node上。

主要源码文件

具体源码分析

基于 Kubernetes release-1.17

组件入口

位置:cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UnixNano())
    //构建command对象
    command := app.NewSchedulerCommand()

    ......
    //执行command
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

类似于所有的k8s组件启动流程,首先解析参数,构建command对象,然后执行command动作。

kube-scheduler 的默认参数在pkg/scheduler/algorithmprovider/defaults/defaults.go中定义,指定默认的的Predicate、AlgorithmProvider以及Priority的策略。

构建command对象

位置:cmd/kube-scheduler/app/server.go

NewSchedulerCommand()方法针对默认参数的schedule动作构建cobra.Command对象。

// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        //指定command执行的逻辑是runCommand()函数
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    //读取一系列配置参数,注入到需要使用的对象中
    fs := cmd.Flags()
    namedFlagSets := opts.Flags()
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
    for _, f := range namedFlagSets.FlagSets {
        fs.AddFlagSet(f)
    }

    usageFmt := "Usage:\n  %s\n"
    cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
    cmd.SetUsageFunc(func(cmd *cobra.Command) error {
        fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
        cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
        return nil
    })
    cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
        fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
        cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    })
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

command启动执行

位置:cmd/kube-scheduler/app/server.go

上一步构建command对象时指定了执行动作是runCommand(),而runCommand函数的本质是调用了server.go的Run()。

Run()主要做了几件事情:

  1. 初始化scheduler对象;
  2. 启动 kube-scheduler server,提供健康检查和指标服务;
  3. 启动和资源相关的informer;
  4. 执行 sched.Run() 方法,执行调度逻辑。
// Run executes the scheduler based on the given configuration. 
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
    ...
    // 1. 创建scheduler对象
    sched, err := scheduler.New(
        //根据相关config初始化对象
        ......
    )
    if err != nil {
        return err
    }

    // 启动事件广播
    if cc.Broadcaster != nil && cc.EventClient != nil {
        cc.Broadcaster.StartRecordingToSink(ctx.Done())
    }
    if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
        cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
    }
    // Setup healthz checks.
    var checks []healthz.HealthChecker
    if cc.ComponentConfig.LeaderElection.LeaderElect {
        checks = append(checks, cc.LeaderElection.WatchDog)
    }

    // 启动安全检查和资源指标服务
    if cc.InsecureServing != nil {
        separateMetrics := cc.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
        if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    if cc.InsecureMetricsServing != nil {
        handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
        if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
            return fmt.Errorf("failed to start metrics server: %v", err)
        }
    }
    if cc.SecureServing != nil {
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
        // TODO: handle stoppedCh returned by c.SecureServing.Serve
        if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
            // fail early for secure handlers, removing the old error loop from above
            return fmt.Errorf("failed to start secure server: %v", err)
        }
    }

    // 启动Informer
    go cc.PodInformer.Informer().Run(ctx.Done())
    cc.InformerFactory.Start(ctx.Done())

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(ctx.Done())

    // 等待选举leader
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: sched.Run,
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // 执行sched.Run()
    sched.Run(ctx)
    return fmt.Errorf("finished without leader elect")
}
上一篇下一篇

猜你喜欢

热点阅读