Deployment-Controller源码分析

2022-06-19  本文已影响0人  periky

入口逻辑

启动和初始化入口 startDeploymentController

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
    }
    // DeploymentController实例化
    dc, err := deployment.NewDeploymentController(
        // Deployment Informer
        ctx.InformerFactory.Apps().V1().Deployments(),
        // ReplicaSet Informer
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        // Pod Informer
        ctx.InformerFactory.Core().V1().Pods(),
        // k8s client
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

startDeploymentController()函数中通过NewDeploymentController()方法初始化DeploymentController实例,入参包含Deployment、ReplicaSet、Pod的Informer接口和K8sClient。

DeploymentController对象

DeploymentController类型定义

// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
type DeploymentController struct {
    // rsControl is used for adopting/releasing replica sets.
    rsControl     controller.RSControlInterface
    client        clientset.Interface
    eventRecorder record.EventRecorder

    // To allow injection of syncDeployment for testing.
    syncHandler func(dKey string) error
    // used for unit testing
    enqueueDeployment func(deployment *apps.Deployment)

    // dLister can list/get deployments from the shared informer's store
    dLister appslisters.DeploymentLister
    // rsLister can list/get replica sets from the shared informer's store
    rsLister appslisters.ReplicaSetLister
    // podLister can list/get pods from the shared informer's store
    podLister corelisters.PodLister

    // dListerSynced returns true if the Deployment store has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    dListerSynced cache.InformerSynced
    // rsListerSynced returns true if the ReplicaSet store has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    rsListerSynced cache.InformerSynced
    // podListerSynced returns true if the pod store has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    podListerSynced cache.InformerSynced

    // Deployments that need to be synced
    queue workqueue.RateLimitingInterface
}

DeploymentController初始化

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    // k8s事件
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

    // 注册prometheus metric `deployment_controller_rate_limiter_use`
    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dc := &DeploymentController{
        client:        client,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    }
    
    // k8s client, replicaset操作
    dc.rsControl = controller.RealRSControl{
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }

    // deployment informer ResourceEventHandler
    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,     // create deployment的事件处理逻辑
        UpdateFunc: dc.updateDeployment,  // update deployment的事件处理逻辑
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,  // delete deployment的事件处理逻辑
    })
    // replicaset informer ResourceEventHandler
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    // pod ResourceEventHandler
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })

    // deployment处理逻辑,不支持同一个deployment资源的并发处理
    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue

    // resource lister
    dc.dLister = dInformer.Lister()
    dc.rsLister = rsInformer.Lister()
    dc.podLister = podInformer.Lister()
    // 缓存同步完成标记
    dc.dListerSynced = dInformer.Informer().HasSynced
    dc.rsListerSynced = rsInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    return dc, nil
}

DeploymentController启动

DeploymentController执行逻辑 -- Run()函数

// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dc.queue.ShutDown()

    klog.InfoS("Starting controller", "controller", "deployment")
    defer klog.InfoS("Shutting down controller", "controller", "deployment")

    // 等待资源[deployment,replicaset,pod]缓存同步完成
    if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(dc.worker, time.Second, stopCh)
    }

    <-stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
    for dc.processNextWorkItem() {
    }
}
func (dc *DeploymentController) processNextWorkItem() bool {
    key, quit := dc.queue.Get()
    if quit {
        return false
    }
    defer dc.queue.Done(key)

    // syncDeployment,同步当前资源的状态
    err := dc.syncHandler(key.(string))
    dc.handleErr(err, key)

    return true
}
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
    // 获取当前资源的namespace,name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
        return err
    }

    startTime := time.Now()
    klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
    defer func() {
        klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
    }()

    // 根据 namespace 和 name 从 cache 中检索对应 Deployment 对象
    deployment, err := dc.dLister.Deployments(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
        return nil
    }
    if err != nil {
        return err
    }

    // Deep-copy otherwise we are mutating our cache.
    // TODO: Deep-copy only when needed.
    d := deployment.DeepCopy()

    everything := metav1.LabelSelector{}
    // 空LabelSelector匹配当前namespace下所有pod, 发送一个Warning Event, 更新 .Status.ObservedGeneration 然后返回
    if reflect.DeepEqual(d.Spec.Selector, &everything) {
        dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
        if d.Status.ObservedGeneration < d.Generation {
            d.Status.ObservedGeneration = d.Generation
            dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
        }
        return nil
    }

    // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
    // through adoption/orphaning.
    // 获取当前Deployment拥有的所有ReplicaSet, 同时会更新这些ReplicaSet的ControllerRef
    rsList, err := dc.getReplicaSetsForDeployment(d)
    if err != nil {
        return err
    }
    // List all Pods owned by this Deployment, grouped by their ReplicaSet.
    // Current uses of the podMap are:
    //
    // * check if a Pod is labeled correctly with the pod-template-hash label.
    // * check that no old Pods are running in the middle of Recreate Deployments.
    // map[types.UID][]*v1.Pod 类型,key是rs的UID,value是对应rs管理的所有pod列表
    podMap, err := dc.getPodMapForDeployment(d, rsList)
    if err != nil {
        return err
    }

    // 已经标记删除的deployment,仅更新状态
    if d.DeletionTimestamp != nil {
        return dc.syncStatusOnly(d, rsList)
    }

    // Update deployment conditions with an Unknown condition when pausing/resuming
    // a deployment. In this way, we can be sure that we won't timeout when a user
    // resumes a Deployment with a set progressDeadlineSeconds.
    // 根据 .Spec.Pause配置看是否更新Deployment的conditions
    if err = dc.checkPausedConditions(d); err != nil {
        return err
    }

    // .spec.paused=true,更新所有replicas!=0的replicaset
    if d.Spec.Paused {
        return dc.sync(d, rsList)
    }

    // rollback is not re-entrant in case the underlying replica sets are updated with a new
    // revision so we should ensure that we won't proceed to update replica sets until we
    // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
    if getRollbackTo(d) != nil {
        // 老版本deployment版本回滚逻辑兼容
        return dc.rollback(d, rsList)
    }

    // 判断当前事件是否为副本数目调整事件
    scalingEvent, err := dc.isScalingEvent(d, rsList)
    if err != nil {
        return err
    }
    if scalingEvent {
        return dc.sync(d, rsList)
    }

    switch d.Spec.Strategy.Type {
    case apps.RecreateDeploymentStrategyType:
        // pod重建的发布策略
        return dc.rolloutRecreate(d, rsList, podMap)
    case apps.RollingUpdateDeploymentStrategyType:
        // pod滚动更新的发布策略
        return dc.rolloutRolling(d, rsList)
    }
    return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
上一篇 下一篇

猜你喜欢

热点阅读