Docker容器虚拟化技术k8s那点事儿

[k8s源码分析][kube-scheduler]schedul

2019-10-16  本文已影响0人  nicktming

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

本文将分析kube-scheduler的自定义调度器, 本文针对的是可以自己选择合适的预选和优选方法.
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)

2. 不带扩展方法

architecture.png

2.1 例子

集群安装可以参考k8s源码编译以及二进制安装(用于源码开发调试版).

2.2.1 准备配置文件

因为需要自定义预选和优选方法以及扩展方法, 所以肯定需要配置文件(schedulerConfig.yaml)

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
  policy:
    file:
      path: policy.yaml
leaderElection:
  leaderElect: true
  lockObjectName: my-scheduler
  lockObjectNamespace: kube-system

下面是policy文件.

{
  "kind" : "Policy",
  "apiVersion" : "v1",
  "predicates" : [
  {"name" : "PodFitsHostPorts"},
  {"name" : "PodFitsResources"},
  {"name" : "NoDiskConflict"},
  {"name" : "MatchNodeSelector"},
  {"name" : "HostName"}
  ],
  "priorities" : [
  {"name" : "LeastRequestedPriority", "weight" : 1},
  {"name" : "BalancedResourceAllocation", "weight" : 1},
  {"name" : "ServiceSpreadingPriority", "weight" : 1},
  {"name" : "EqualPriority", "weight" : 1}
  ],
  "hardPodAffinitySymmetricWeight" : 10
}
2.1.2 运行以及测试

接着重新启动kube-scheduler用如下命令./kube-scheduler --master=http://localhost:8080 --config=schedulerConfig.yaml. (关于schedulerConfig.yamlpolicy.yaml自行修改就行)

接着部署一个带有schedulerNamepod和一个不带有schedulerNamepod.

[root@master kubectl]# cat pod-scheduler.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: test-schduler
spec:
  schedulerName: my-scheduler
  containers:
  - name: podtest-scheduler
    image: nginx
    ports:
    - containerPort: 80
[root@master kubectl]# cat pod.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: test
spec:
  containers:
  - name: podtest
    image: nginx
    ports:
    - containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml 
[root@master kubectl]# ./kubectl apply -f pod.yaml 
[root@master kubectl]# ./kubectl get pods 
NAME            READY   STATUS    RESTARTS   AGE
test            0/1     Pending   0          83s
test-schduler   1/1     Running   0          13m

可以看到带有schedulerNamepod, 也就是test-scheduler已经成功运行.

[root@master kubectl]# ./kubectl describe pod test-scheduler
...
Events:
  Type     Reason             Age                 From                  Message
  ----     ------             ----                ----                  -------
  Normal   Scheduled          12m                 my-scheduler          Successfully assigned default/test-schduler to 172.21.0.12
  Normal   Pulling            12m                 kubelet, 172.21.0.12  pulling image "nginx"
  Normal   Pulled             11m                 kubelet, 172.21.0.12  Successfully pulled image "nginx"
  Normal   Created            11m                 kubelet, 172.21.0.12  Created container
  Normal   Started            11m                 kubelet, 172.21.0.12  Started container
  Warning  MissingClusterDNS  62s (x12 over 12m)  kubelet, 172.21.0.12  pod: "test-schduler_default(213933b8-efda-11e9-9434-525400d54f7e)". kubelet does not have ClusterDNS IP configured and cannot create Pod using "ClusterFirst" policy. Falling back to "Default" policy.

而没有带有schedulerNamepod, 也就是test一直处于pending状态, 因为没有设置schedulerName的情况下默认使用k8s默认的调度器, 但是默认的调度器目前没有启动, 所以无法调度. 如果此时, 再启动一个不带config参数的调度器, 那就该pod就会被调度. 关于默认调度器可以参考 [k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler

2.2 源码分析

2.2.1 解析文件

其实该部分的源码大部分已经在[k8s源码分析][kube-scheduler]scheduler之启动run(1) 中已经分析了, 所以这里就尽量从简. 解析kube-scheduler中的config如下.

NewSchedulerCommand -> runCommand -> opts.Config() -> o.ApplyTo(c) 

所以最终是到这里会来加载config文件中的内容.

// cmd/kube-scheduler/app/options/options.go

func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
    // 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿
    if len(o.ConfigFile) == 0 {
        ...
    } else {
        // 如果kube-scheduler 指定了--config 那就会从配置文件中取
        cfg, err := loadConfigFromFile(o.ConfigFile)
        if err != nil {
            return err
        }

        // use the loaded config file only, with the exception of --address and --port. This means that
        // none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
        // behaviour of the flags we have to keep.
        c.ComponentConfig = *cfg

        if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
            return err
        }
    }
    ...
}

// cmd/kube-scheduler/app/options/configfile.go

func loadConfigFromFile(file string) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
    data, err := ioutil.ReadFile(file)
    if err != nil {
        return nil, err
    }

    return loadConfig(data)
}

func loadConfig(data []byte) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
    configObj := &kubeschedulerconfig.KubeSchedulerConfiguration{}
    if err := runtime.DecodeInto(kubeschedulerscheme.Codecs.UniversalDecoder(), data, configObj); err != nil {
        return nil, err
    }

    return configObj, nil
}

进而把config(schedulerConfig.yaml)中的内容转化成一个kubeschedulerconfig.KubeSchedulerConfiguration对象如下:

// pkg/scheduler/apis/config/types.go 

type KubeSchedulerConfiguration struct {
    metav1.TypeMeta
    // SchedulerName is name of the scheduler, used to select which pods
    // will be processed by this scheduler, based on pod's "spec.SchedulerName".
    SchedulerName string
    // AlgorithmSource specifies the scheduler algorithm source.
    AlgorithmSource SchedulerAlgorithmSource
    // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
    // corresponding to every RequiredDuringScheduling affinity rule.
    // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
    HardPodAffinitySymmetricWeight int32
    // 高可用的时候会分析
    LeaderElection KubeSchedulerLeaderElectionConfiguration
    ClientConnection apimachineryconfig.ClientConnectionConfiguration
    // defaulting to 0.0.0.0:10251
    HealthzBindAddress string
    // serve on, defaulting to 0.0.0.0:10251.
    MetricsBindAddress string
    apiserverconfig.DebuggingConfiguration
    // 是否禁止抢占
    DisablePreemption bool
    PercentageOfNodesToScore int32
    FailureDomains string
    BindTimeoutSeconds *int64
}

type SchedulerAlgorithmSource struct {
    // Policy 策略
    Policy *SchedulerPolicySource
    // Provider is the name of a scheduling algorithm provider to use.
    Provider *string
}
type SchedulerPolicySource struct {
    // 从文件中读
    File *SchedulerPolicyFileSource
    // 从configmap中读
    ConfigMap *SchedulerPolicyConfigMapSource
}

// 高可用
type KubeSchedulerLeaderElectionConfiguration struct {
    apiserverconfig.LeaderElectionConfiguration
    // LockObjectNamespace defines the namespace of the lock object
    LockObjectNamespace string
    // LockObjectName defines the lock object name
    LockObjectName string
}
type LeaderElectionConfiguration struct {
    LeaderElect bool
    LeaseDuration metav1.Duration
    RenewDeadline metav1.Duration
    RetryPeriod metav1.Duration
    ResourceLock string
}

// k8s.io/apimachinery/pkg/apis/meta/v1/types.go
type TypeMeta struct {
    Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
    APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`
}

可以看到c.ComponentConfig = *cfg就是这个schedulerConfig.yaml所转化的kubeschedulerconfig.KubeSchedulerConfiguration.

2.2.2 解析algorithmSource

接着就是

runCommand -> Run(cc, stopCh) -> scheduler.New```

注意: scheduler.New传进来的kubeschedulerconfig.SchedulerAlgorithmSource就是cc.ComponentConfig.AlgorithmSource也就是schedulerConfig.yaml中的algorithmSource.

// New returns a Scheduler
func New(client clientset.Interface,
    ...
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {
    ...
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // 默认调度器会进入到这里 *source.Provider = DefaultProvider
        ...
    case source.Policy != nil:
        // 自定义调度器会进入到这里
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    ...
}

可以到initPolicyFromFile方法根据source.Policy.File.Path也就是policy.yaml的路径读取内容并进行解析, 然后转化成schedulerapi.Policy对象.

type Policy struct {
    metav1.TypeMeta
    Predicates []PredicatePolicy
    Priorities []PriorityPolicy
    ExtenderConfigs []ExtenderConfig
    HardPodAffinitySymmetricWeight int32
    AlwaysCheckAllPredicates bool
}
type PredicatePolicy struct {
    Name string
    Argument *PredicateArgument
}
type PriorityPolicy struct {
    Name string
    Weight int
    Argument *PriorityArgument
}
type PredicateArgument struct {
    ServiceAffinity *ServiceAffinity
    LabelsPresence *LabelsPresence
}
type PriorityArgument struct {
    ServiceAntiAffinity *ServiceAntiAffinity
    LabelPreference *LabelPreference
    RequestedToCapacityRatioArguments *RequestedToCapacityRatioArguments
}
type ExtenderConfig struct {
    URLPrefix string
    FilterVerb string
    PreemptVerb string
    PrioritizeVerb string
    Weight int
    BindVerb string
    EnableHTTPS bool
    TLSConfig *restclient.TLSClientConfig
    HTTPTimeout time.Duration
    NodeCacheCapable bool
    ManagedResources []ExtenderManagedResource
    Ignorable bool
}

关于配置扩展方法的也列出来了, 不用多说, 结构体中的内容与yaml内容对应解析的.

2.2.3 根据policy生成factory.config
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from configuration: %v", policy)

    // validate the policy configuration
    if err := validation.ValidatePolicy(policy); err != nil {
        return nil, err
    }

    // 生成预选方法的key
    // 如果没有 就那默认的那些预选方法的key
    predicateKeys := sets.NewString()
    if policy.Predicates == nil {
        klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        predicateKeys = provider.FitPredicateKeys
    } else {
        for _, predicate := range policy.Predicates {
            klog.V(2).Infof("Registering predicate: %s", predicate.Name)
            predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
        }
    }

    // 生成优选方法的key
    // 如果没有 就那默认的那些优选方法的key
    priorityKeys := sets.NewString()
    if policy.Priorities == nil {
        klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        priorityKeys = provider.PriorityFunctionKeys
    } else {
        for _, priority := range policy.Priorities {
            klog.V(2).Infof("Registering priority: %s", priority.Name)
            priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
        }
    }

    // 生成扩展
    var extenders []algorithm.SchedulerExtender
    if len(policy.ExtenderConfigs) != 0 {
        ignoredExtendedResources := sets.NewString()
        for ii := range policy.ExtenderConfigs {
            klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
            extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
            if err != nil {
                return nil, err
            }
            extenders = append(extenders, extender)
            for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
                if r.IgnoredByScheduler {
                    ignoredExtendedResources.Insert(string(r.Name))
                }
            }
        }
        predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
    }
    // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
    // Give it higher precedence than scheduler CLI configuration when it is provided.
    if policy.HardPodAffinitySymmetricWeight != 0 {
        c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
    }
    // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
    // predicates even after one or more of them fails.
    if policy.AlwaysCheckAllPredicates {
        c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
    }

    // 根据预选, 优选, 扩展方法进行生成config
    return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}

CreateFromKeys[k8s源码分析][kube-scheduler]scheduler之启动run(1) 已经分析过了, 这个主要注重一下extenders.

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
    klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

    if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
        return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
    }

    // 根据当前的预选key得到所有的预选方法
    predicateFuncs, err := c.GetPredicates(predicateKeys)
    if err != nil {
        return nil, err
    }

    // 根据当前的优选key得到所有的优选方法
    priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
    if err != nil {
        return nil, err
    }

    // priorityMetaProducer 在算分的时候会用到
    priorityMetaProducer, err := c.GetPriorityMetadataProducer()
    if err != nil {
        return nil, err
    }
    // predicateMetaProducer 在真正预选的时候会用到
    predicateMetaProducer, err := c.GetPredicateMetadataProducer()
    if err != nil {
        return nil, err
    }

    // 是否打开了加速predicate的equivalence class cache
    // Init equivalence class cache
    if c.enableEquivalenceClassCache {
        c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
        klog.Info("Created equivalence class cache")
    }

    // 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.equivalencePodCache,
        c.podQueue,
        predicateFuncs,
        predicateMetaProducer,
        priorityConfigs,
        priorityMetaProducer,
        extenders,
        c.volumeBinder,
        c.pVCLister,
        c.pdbLister,
        c.alwaysCheckAllPredicates,
        c.disablePreemption,
        c.percentageOfNodesToScore,
    )
    ...
}

3. 总结

本文使用了一个例子来说明自定义调度器是如何使用, 比如可以自己确定使用哪些预选和优选方法, 后面会继续分析如何使用自己扩展的预选和优选方法.

上一篇下一篇

猜你喜欢

热点阅读