k8s源码阅读(3) - kube-controller-man

2022-02-14  本文已影响0人  拿着滋水枪的消防员

启动流程

文件cmd/kube-controller-manager/app/controllermanager.go

入口方法


    var controllerChecks []healthz.HealthChecker
    // 逐个启动controller, 生成健康检查
    for controllerName, initFn := range controllers {
        if !controllerCtx.IsControllerEnabled(controllerName) {
            klog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

        klog.V(1).Infof("Starting %q", controllerName)
        ctrl, started, err := initFn(ctx, controllerCtx)
        if err != nil {
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
        check := controllerhealthz.NamedPingChecker(controllerName)
        if ctrl != nil {
            // check if the controller supports and requests a debugHandler
            // and it needs the unsecuredMux to mount the handler onto.
            if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
                if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
                    basePath := "/debug/controllers/" + controllerName
                    unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
                    unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
                }
            }
            if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
                if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
                    check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
                }
            }
        }
        controllerChecks = append(controllerChecks, check)

        klog.Infof("Started %q", controllerName)
    }

    healthzHandler.AddHealthChecker(controllerChecks...)
// 这一段定义了加载启动的controller列表
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["endpointslice"] = startEndpointSliceController
    controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    controllers["nodelifecycle"] = startNodeLifecycleController
    if loopMode == IncludeCloudLoops {
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
        // TODO: volume controller into the IncludeCloudLoops only set.
    }
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    controllers["root-ca-cert-publisher"] = startRootCACertPublisher
    controllers["ephemeral-volume"] = startEphemeralVolumeController
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
        utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
        controllers["storage-version-gc"] = startStorageVersionGCController
    }

NodeLifecycle控制器

文件cmd/kube-controller-manager/app/core.go

启动控制器

文件pkg/controller/nodelifecycle/node_lifecycle_controller.go

新建控制器

// 这里是初始化controller,增加一些client-go的消费事件以及出发函数、缓存等

运行NodeLifecycleController

// 这里开始执行消费pod,node事件的处理协程
    ...
    if nc.runTaintManager {
        go nc.taintManager.Run(ctx)
    }

    // Close node update queue to cleanup go routine.
    defer nc.nodeUpdateQueue.ShutDown()
    defer nc.podUpdateQueue.ShutDown()

    // Start workers to reconcile labels and/or update NoSchedule taint for nodes.
    for i := 0; i < scheduler.UpdateWorkerSize; i++ {
        // Thanks to "workqueue", each worker just need to get item from queue, because
        // the item is flagged when got from queue: if new event come, the new item will
        // be re-queued until "Done", so no more than one worker handle the same item and
        // no event missed.
        go wait.UntilWithContext(ctx, nc.doNodeProcessingPassWorker, time.Second)
    }

    for i := 0; i < podUpdateWorkerSize; i++ {
        go wait.UntilWithContext(ctx, nc.doPodProcessingWorker, time.Second)
    }

    if nc.runTaintManager {
        // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
        // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
        go wait.UntilWithContext(ctx, nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod)
    } else {
        // Managing eviction of nodes:
        // When we delete pods off a node, if the node was not empty at the time we then
        // queue an eviction watcher. If we hit an error, retry deletion.
        go wait.UntilWithContext(ctx, nc.doEvictionPass, scheduler.NodeEvictionPeriod)
    }

    // Incorporate the results of node health signal pushed from kubelet to master.
    go wait.UntilWithContext(ctx, func(ctx context.Context) {
        if err := nc.monitorNodeHealth(ctx); err != nil {
            klog.Errorf("Error monitoring node health: %v", err)
        }
    }, nc.nodeMonitorPeriod)

    <-ctx.Done()
1. doNoScheduleTaintingPass
- 这里根据node的状态转换对应的污点,如果包含了不能调度的状态,则打上一个NoSchedule的污点,以下就是会被打禁止调度污点的状态匹配规则,匹配到了则打污点
    nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
        v1.NodeReady: {
            v1.ConditionFalse:   v1.TaintNodeNotReady,
            v1.ConditionUnknown: v1.TaintNodeUnreachable,
        },
        v1.NodeMemoryPressure: {
            v1.ConditionTrue: v1.TaintNodeMemoryPressure,
        },
        v1.NodeDiskPressure: {
            v1.ConditionTrue: v1.TaintNodeDiskPressure,
        },
        v1.NodeNetworkUnavailable: {
            v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
        },
        v1.NodePIDPressure: {
            v1.ConditionTrue: v1.TaintNodePIDPressure,
        },
    }

- node.Spec.Unschedulable字段被改为true(cordon一个node)    也会打上一个污点
- 对分析出应打的污点做一个分析出需要增加跟删除的污点,操作api进行更新
2. reconcileNodeLabels 
自动补os跟arch标签的,自动将kubernetes.io/os跟beta.kubernetes.io/os的标签互相补全,应该是为了兼容不同版本的kubelet还有调度器吧,并无其他作用.
控制器的nodeHealthData属性里缓存了node的健康状态,结构体对应的nodeHealthMap
- 消费pod事件,获取pod实例跟node的实例以及node的健康状态的deepcopy
- 如果开启了taint manager则会根据node的状态去平滑的执行evictPod,平滑的删除pod信息
- 根据node的状态,如果状态异常,将pod的状态标记为notready(更新pod的status),并产生event事件
详细逻辑自行阅读代码
详细逻辑自行阅读代码


以上内容以node生命周期控制器为例,讲解了整个kube-controller-manager的启动流程,协助大家根据自己的具体需求,快速定位修改源码.
文章未完结,看到哪补充到哪

上一篇下一篇

猜你喜欢

热点阅读