k8s调度器如何调度有pvc的pod
2024-12-24 本文已影响0人
wwq2020
前言
和无pvc pod的差异主要是VolumeZone,VolumeRestrictions和VolumeBinding三个插件
VolumeZone
简单总结
检查node labels中是否存在volumeZoneLabels(通过Filter阶段过滤不满足条件的node)
不存在则认为可调度,
存在则检查是否匹配pod labels中的volumeZoneLabels是否在node labels中存在且相等
源码
pkg/scheduler/framework/plugins/volumezone/volume_zone.go中
var volumeZoneLabels = sets.NewString(
v1.LabelFailureDomainBetaZone, // "failure-domain.beta.kubernetes.io/zone" deprecated
v1.LabelFailureDomainBetaRegion, "failure-domain.beta.kubernetes.io/region" // deprecated
v1.LabelTopologyZone, // "topology.kubernetes.io/zone"
v1.LabelTopologyRegion, // "topology.kubernetes.io/region"
)
func (pl *VolumeZone) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
,,,
var podPVTopologies []pvTopology
...
从pod labels中获取volumeZoneLabels中的key
podPVTopologies, status = pl.getPVbyPod(logger, pod)
...
hasAnyNodeConstraint := false
检查node labels是否存在volumeZoneLabels中的key
for _, topologyLabel := range topologyLabels {
if _, ok := node.Labels[topologyLabel]; ok {
hasAnyNodeConstraint = true
break
}
}
node labels不存在volumeZoneLabels中的key则认为可调度
if !hasAnyNodeConstraint {
return nil
}
,,,
判断node label是否存在相应的key且value相等
for _, pvTopology := range podPVTopologies {
v, ok := node.Labels[pvTopology.key]
if !ok {
v, ok = node.Labels[translateToGALabel(pvTopology.key)]
}
不存在或者value不相等则认为此节点不可调度
if !ok || !pvTopology.values.Has(v) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict)
}
}
认为可调度
return nil
...
}
VolumeRestrictions
简单总结
针对GCEPersistentDisk,AWSElasticBlockStore,RBD,ISCSI类型的volume的检查(Filter阶段)
针对ReadWriteOncePod的pvc检查是否被多个pod引用(Filter阶段)
源码
pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go中
针对如下volume的一些检查
func needsRestrictionsCheck(v v1.Volume) bool {
return v.GCEPersistentDisk != nil || v.AWSElasticBlockStore != nil || v.RBD != nil || v.ISCSI != nil
}
func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
针对GCEPersistentDisk,AWSElasticBlockStore,RBD,ISCSI类型的volume的检查
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
检查ReadWriteOncePod的pvc被多个pod引用
return satisfyReadWriteOncePod(ctx, state)
}
VolumeBinding
简单总结
在PreFilter阶段
检查pod是否引用了Immediate但是仍未bound的pvc,是则不可调度
在Filter阶段
针对已bound的pvc
检查pv的nodeaffinity是否匹配node labels,不匹配则此节点不可调度
针对unboundDelayBinding的pvc
pvc存在volume.kubernetes.io/selected-node注解时
不等于当前节点,则认为此节点不可调度
等于等钱节点则进入prosion检查
provisioning检查
pvc是否有注解volume.beta.kubernetes.io/storage-class或者设置了StorageClassName(优先查找注解),如都无则失败
通过className查找class,找不到则失败
检查此class的provisioner是否为kubernetes.io/no-provisioner,是则失败
检查class的AllowedTopologies和node的labels是否匹配,不匹配则失败
检查pvc是否设置storage requests,不设置则成功
通过provisioner查找csidriver,找不到则成功
检查csidriver的StorageCapacity是否为空或者false,是则成功
查看所有CSIStorageCapacity,如果其中存在className等于pvc的className且他的Capacity(如果MaximumVolumeSize有设置则用此值)大于pvc的storage requests且他的NodeTopology匹配node labels则成功,否则失败
findmatching检查
查看满足pvc的className,selector,volumemode,volumeAttributesClassName,满足pvc storage request且pv的nodeaffinity匹配node labels的pv,找不到认为失败
失败的添加到provisioning检查
源码
pkg/scheduler/framework/plugins/volumebinding/volume_binding.go中
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
...
获取
...
pod引用了Immediate但是仍未bound的pvc
if len(podVolumeClaims.unboundClaimsImmediate) > 0 {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return nil, status
}
...
}
func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
...
查找pod volume
podVolumes, reasons, err := pl.Binder.FindPodVolumes(logger, pod, state.podVolumeClaims, node)
...
如果有reasnon则不可调度
if len(reasons) > 0 {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
for _, reason := range reasons {
status.AppendReason(string(reason))
}
return status
}
}
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
...
cache中g更新pvc/pv信息
allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes)
...
}
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
...
调用apiserver执行pvc/pv信息更新
err = pl.Binder.BindPodVolumes(pod, podVolumes)
...
}
pkg/scheduler/framework/plugins/volumebinding/binder.go
func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
...
针对已bound的pvc
if len(podVolumeClaims.boundClaims) > 0 {
boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(logger, podVolumeClaims.boundClaims, node, pod)
if err != nil {
return
}
}
...
针对unboundDelayBinding的pvc
if len(podVolumeClaims.unboundClaimsDelayBinding) > 0 {
...
for _, claim := range podVolumeClaims.unboundClaimsDelayBinding {
volume.kubernetes.io/selected-node注解检查
if selectedNode, ok := claim.Annotations[volume.AnnSelectedNode]; ok {
if selectedNode != node.Name {
unboundVolumesSatisfied = false
return
}
claimsToProvision = append(claimsToProvision, claim)
} else {
claimsToFindMatching = append(claimsToFindMatching, claim)
}
}
findmatching检查
if len(claimsToFindMatching) > 0 {
var unboundClaims []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(logger, pod, claimsToFindMatching, podVolumeClaims.unboundVolumesDelayBinding, node)
if err != nil {
return
}
claimsToProvision = append(claimsToProvision, unboundClaims...)
}
provision插件
if len(claimsToProvision) > 0 {
unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(logger, pod, claimsToProvision, node)
if err != nil {
return
}
}
}
}
func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
...
更新pvc/pv的cache
err = b.pvcCache.Assume(claimClone)
...
}
func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
...
执行实际的pvc/pv更新
err = b.bindAPIUpdate(ctx, assumedPod, bindings, claimsToProvision)
if err != nil {
return err
}
等待pvc/pv bound
err = wait.PollUntilContextTimeout(ctx, time.Second, b.bindTimeout, false, func(ctx context.Context) (bool, error) {
b, err := b.checkBindings(logger, assumedPod, bindings, claimsToProvision)
return b, err
})
...
}