深入分析kubelet(2)——创建Pod

2018-09-29  本文已影响0人  陈先生_9e91

深入分析kubelet(2)——创建Pod

紧接着上一篇继续学习。上一篇讲到生产者,本篇将介绍消费者。

background

PodManager

k8s.io\kubernetes\pkg\kubelet\pod\pod_manager.go

// Manager stores and manages access to pods, maintaining the mappings
// between static pods and mirror pods.
//
// The kubelet discovers pod updates from 3 sources: file, http, and
// apiserver. Pods from non-apiserver sources are called static pods, and API
// server is not aware of the existence of static pods. In order to monitor
// the status of such pods, the kubelet creates a mirror pod for each static
// pod via the API server.
//
// A mirror pod has the same pod full name (name and namespace) as its static
// counterpart (albeit different metadata such as UID, etc). By leveraging the
// fact that the kubelet reports the pod status using the pod full name, the
// status of the mirror pod always reflects the actual status of the static
// pod. When a static pod gets deleted, the associated orphaned mirror pod
// will also be removed.
type Manager interface {
    // GetPods returns the regular pods bound to the kubelet and their spec.
    GetPods() []*v1.Pod
    // GetPodByFullName returns the (non-mirror) pod that matches full name, as well as
    // whether the pod was found.
    GetPodByFullName(podFullName string) (*v1.Pod, bool)
    // GetPodByName provides the (non-mirror) pod that matches namespace and
    // name, as well as whether the pod was found.
    GetPodByName(namespace, name string) (*v1.Pod, bool)
    // GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as
    // whether the pod is found.
    GetPodByUID(types.UID) (*v1.Pod, bool)
    // GetPodByMirrorPod returns the static pod for the given mirror pod and
    // whether it was known to the pod manger.
    GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
    // GetMirrorPodByPod returns the mirror pod for the given static pod and
    // whether it was known to the pod manager.
    GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
    // GetPodsAndMirrorPods returns the both regular and mirror pods.
    GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
    // SetPods replaces the internal pods with the new pods.
    // It is currently only used for testing.
    SetPods(pods []*v1.Pod)
    // AddPod adds the given pod to the manager.
    AddPod(pod *v1.Pod)
    // UpdatePod updates the given pod in the manager.
    UpdatePod(pod *v1.Pod)
    // DeletePod deletes the given pod from the manager.  For mirror pods,
    // this means deleting the mappings related to mirror pods.  For non-
    // mirror pods, this means deleting from indexes for all non-mirror pods.
    DeletePod(pod *v1.Pod)
    // DeleteOrphanedMirrorPods deletes all mirror pods which do not have
    // associated static pods. This method sends deletion requests to the API
    // server, but does NOT modify the internal pod storage in basicManager.
    DeleteOrphanedMirrorPods()
    // TranslatePodUID returns the actual UID of a pod. If the UID belongs to
    // a mirror pod, returns the UID of its static pod. Otherwise, returns the
    // original UID.
    //
    // All public-facing functions should perform this translation for UIDs
    // because user may provide a mirror pod UID, which is not recognized by
    // internal Kubelet functions.
    TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
    // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
    // UIDs and mirror pod UIDs to static pod UIDs.
    GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
    // IsMirrorPodOf returns true if mirrorPod is a correct representation of
    // pod; false otherwise.
    IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool

    MirrorClient
}

上一篇介绍过static pod,与之对应的概念是mirror pod

Pod status

参考k8s.io\api\core\v1\types.go

PodWorkers

k8s.io\kubernetes\pkg\kubelet\pod_workers.go

// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
    UpdatePod(options *UpdatePodOptions)
    ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
    ForgetWorker(uid types.UID)
}

type podWorkers struct {
    // Protects all per worker fields.
    podLock sync.Mutex

    // Tracks all running per-pod goroutines - per-pod goroutine will be
    // processing updates received through its corresponding channel.
    podUpdates map[types.UID]chan UpdatePodOptions
    // Track the current state of per-pod goroutines.
    // Currently all update request for a given pod coming when another
    // update of this pod is being processed are ignored.
    isWorking map[types.UID]bool
    // Tracks the last undelivered work item for this pod - a work item is
    // undelivered if it comes in while the worker is working.
    lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions

    workQueue queue.WorkQueue

    // This function is run to sync the desired stated of pod.
    // NOTE: This function has to be thread-safe - it can be called for
    // different pods at the same time.
    syncPodFn syncPodFnType

    // The EventRecorder to use
    recorder record.EventRecorder

    // backOffPeriod is the duration to back off when there is a sync error.
    backOffPeriod time.Duration

    // resyncInterval is the duration to wait until the next sync.
    resyncInterval time.Duration

    // podCache stores kubecontainer.PodStatus for all pods.
    podCache kubecontainer.Cache
}

code

k8s.io\kubernetes\pkg\kubelet\kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    kl.syncLoop(updates, kl)
}

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    // The resyncTicker wakes up kubelet to checks if there are any pod workers
    // that need to be sync'd. A one-second period is sufficient because the
    // sync interval is defaulted to 10s.
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    // 2s
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    plegCh := kl.pleg.Watch()
    for {
        kl.syncLoopMonitor.Store(kl.clock.Now())
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

可以看到,updates贯穿整个过程,是一个非常重要的概念,所以上一篇整篇都在分析updates的由来。这里我们重点关注syncLoopIteration

// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1.  configCh:       a channel to read config events from
// 2.  handler:        the SyncHandler to dispatch pods to
// 3.  syncCh:         a channel to read periodic sync events from
// 4.  houseKeepingCh: a channel to read housekeeping events from
// 5.  plegCh:         a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
//             handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
//                     containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            glog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.RESTORE:
            // These are pods restored from the checkpoint. Treat them as new
            // pods.
            handler.HandlePodAdditions(u.Pods)
        }
    }        
    return true
}

本文只关注syncLoopIteration函数的configCh分支,在可预见的未来,将分析其他分支。

这里只是简单地将Pods分给对应的handler处理。

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    for _, pod := range pods {
        // Always add the pod to the pod manager. Kubelet relies on the pod
        // manager as the source of truth for the desired state. If a pod does
        // not exist in the pod manager, it means that it has been deleted in
        // the apiserver and no action (other than cleanup) is required.
        kl.podManager.AddPod(pod)

        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
}

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {},
    })
}

调用podWorkers.UpdatePod执行操作。

k8s.io\kubernetes\pkg\kubelet\pod_workers.go

// Apply the new setting to the specified pod.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    pod := options.Pod
    uid := pod.UID

    if podUpdates, exists = p.podUpdates[uid]; !exists {
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } 
}

给Pod创建一个goroutine,并创建一个channel管理它。

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    for update := range podUpdates {
        err := func() error {  
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            return err
        }()
    }
}

这里其实就是回调了podWorkers.syncPodFn方法。

k8s.io\kubernetes\pkg\kubelet\kubelet.go

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)   
}

500多行的函数,里面就有我们需要的答案klet.syncPod。这是一个很复杂的函数,包括了所有Pod状态的处理过程,我们慢慢拆解。

// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
//   start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
//   already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType
    
    // if we want to kill a pod, do it now!
    if updateType == kubetypes.SyncPodKill {
        return kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride)
    }
    
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    
    // Create Cgroups for the pod and apply resource parameters
    // to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // If pod has already been terminated then we need not create
    // or update the pod's cgroup
    if !kl.podIsTerminated(pod) {
        // Create and Update pod's Cgroups
        pcm.EnsureExists(pod)
    }
    
    // Create Mirror Pod for Static Pod if it doesn't already exist
    if kubepod.IsStaticPod(pod) {
        if mirrorPod == nil || deleted {
            kl.podManager.CreateMirrorPod(pod)
        }
    }
    
    // Make data directories for the pod
    kl.makePodDataDirs(pod)
    
    // Volume manager will not mount volumes for terminated pods
    if !kl.podIsTerminated(pod) {
        // Wait for volumes to attach/mount
        kl.volumeManager.WaitForAttachAndMount(pod)
    }
    
    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)
    
    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
}

创建Pod过程:

  1. 如果需要kill,直接kill

  2. 给Pod创建PodStatus对象

  3. 创建cgroups

  4. 如果是static pod,就创建mirror pod,方便通过apiserver查询 static pod,只能查询,其他操作都不可以

  5. 创建数据目录,比如挂载目录

  6. 挂载目录

  7. 获取ImagePullSecrets

  8. 调用CRI创建Pod

k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_manager.go

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create init containers.
//  6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    
    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
    }else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil)
        }
    }
    
    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    
    // Step 5: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit)
    }
    
    // Step 6: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]
        m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular)
    }
}

这个函数的注释写得很赞,过程如下:

  1. 比较网络和容器变化
  2. 如果网络有变化,就把之前的容器删掉
  3. 创建容器网络
  4. 启动init容器
  5. 启动容器

启动init容器也是有说法的,全部逻辑都在computePodActions里面,必须先按顺序将init容器全部启动之后,再启动容器,大概过程如下:

  1. 第一次只启动pod.Spec.InitContainers[0]changes.ContainersToStart为空
  2. 之后每次启动下一个init容器
  3. init容器启动完,启动容器

k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_container.go

// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
    // Step 1: pull the image.
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)   
    
    // Step 2: create the container.
    m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    
    // Step 3: start the container.
    m.runtimeService.StartContainer(containerID)
    
    legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)
    m.osInterface.Symlink(containerLog, legacySymlink)
}

经过一步步抽丝剥茧,这里终于真相了,

  1. 拉镜像
  2. create 容器,runtimeService其实就是通过grpc调用CRI
  3. start 容器
  4. 给容器日志创建soft link,增加K8S相关信息,这里日志采集的时候就很有用,详见K8S Fluentd Mongo日志采集
上一篇 下一篇

猜你喜欢

热点阅读