k8s 之 apiserver 源码简单分析

2020-07-16  本文已影响0人  wwq2020

cmd/kube-apiserver/apiserver.go 中

func main() {
    ...
    command := app.NewAPIServerCommand()
    ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }

cmd/kube-apiserver/app/server.go 中

func NewAPIServerCommand() *cobra.Command {
    s := options.NewServerRunOptions()
    cmd := &cobra.Command{
        Use: "kube-apiserver",
        Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,

        // stop printing usage when the command errors
        SilenceUsage: true,
        PersistentPreRunE: func(*cobra.Command, []string) error {
            // silence client-go warnings.
            // kube-apiserver loopback clients should not log self-issued warnings.
            rest.SetDefaultWarningHandler(rest.NoWarnings{})
            return nil
        },
        RunE: func(cmd *cobra.Command, args []string) error {
            verflag.PrintAndExitIfRequested()
            cliflag.PrintFlags(cmd.Flags())

            // set default options
            completedOptions, err := Complete(s)
            if err != nil {
                return err
            }

            // validate options
            if errs := completedOptions.Validate(); len(errs) != 0 {
                return utilerrors.NewAggregate(errs)
            }

            return Run(completedOptions, genericapiserver.SetupSignalHandler())
        },
        Args: func(cmd *cobra.Command, args []string) error {
            for _, arg := range args {
                if len(arg) > 0 {
                    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                }
            }
            return nil
        },
    }
    ...
}

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())

    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

    prepared, err := server.PrepareRun()
    if err != nil {
        return err
    }

    return prepared.Run(stopCh)
}

func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    if err != nil {
        return nil, err
    }

    kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // If additional API servers are added, they should be gated.
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    if err != nil {
        return nil, err
    }
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
    if err != nil {
        return nil, err
    }

    // aggregator comes last in the chain
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
        return nil, err
    }

    if insecureServingInfo != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }

    return aggregatorServer, nil
}

func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    return kubeAPIServer, nil
}

pkg/master/master.go 中

func (c *Config) Complete() CompletedConfig {
    cfg := completedConfig{
        c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
        &c.ExtraConfig,
    }

    serviceIPRange, apiServerServiceIP, err := ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
    if err != nil {
        klog.Fatalf("Error determining service IP ranges: %v", err)
    }
    if cfg.ExtraConfig.ServiceIPRange.IP == nil {
        cfg.ExtraConfig.ServiceIPRange = serviceIPRange
    }
    if cfg.ExtraConfig.APIServerServiceIP == nil {
        cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
    }

    discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
    discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
        discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
    cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses

    if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
        // TODO: Currently no way to specify an empty range (do we need to allow this?)
        // We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
        // but then that breaks the strict nestedness of ServiceType.
        // Review post-v1
        cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
        klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
    }

    if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
        cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
    }

    if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
        cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
    }

    if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
        cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
    }

    return CompletedConfig{&cfg}
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
    if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
        return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
    }

    s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    if c.ExtraConfig.EnableLogsSupport {
        routes.Logs{}.Install(s.Handler.GoRestfulContainer)
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountIssuerDiscovery) {
        // Metadata and keys are expected to only change across restarts at present,
        // so we just marshal immediately and serve the cached JSON bytes.
        md, err := serviceaccount.NewOpenIDMetadata(
            c.ExtraConfig.ServiceAccountIssuerURL,
            c.ExtraConfig.ServiceAccountJWKSURI,
            c.GenericConfig.ExternalAddress,
            c.ExtraConfig.ServiceAccountPublicKeys,
        )
        if err != nil {
            // If there was an error, skip installing the endpoints and log the
            // error, but continue on. We don't return the error because the
            // metadata responses require additional, backwards incompatible
            // validation of command-line options.
            msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
                " ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
                " enabled. Error: %v", err)
            if c.ExtraConfig.ServiceAccountIssuerURL != "" {
                // The user likely expects this feature to be enabled if issuer URL is
                // set and the feature gate is enabled. In the future, if there is no
                // longer a feature gate and issuer URL is not set, the user may not
                // expect this feature to be enabled. We log the former case as an Error
                // and the latter case as an Info.
                klog.Error(msg)
            } else {
                klog.Info(msg)
            }
        } else {
            routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
                Install(s.Handler.GoRestfulContainer)
        }
    }

    m := &Master{
        GenericAPIServer:          s,
        ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
    }

    // install legacy rest storage
    if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:              c.ExtraConfig.StorageFactory,
            ProxyTransport:              c.ExtraConfig.ProxyTransport,
            KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
            EventTTL:                    c.ExtraConfig.EventTTL,
            ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
            SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
            ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
            LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
            ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
            ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
            ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
            APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
        }
        if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
            return nil, err
        }
    }

    // The order here is preserved in discovery.
    // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
    // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
    // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
    // with specific priorities.
    // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
    // handlers that we have.
    restStorageProviders := []RESTStorageProvider{
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
        authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
        autoscalingrest.RESTStorageProvider{},
        batchrest.RESTStorageProvider{},
        certificatesrest.RESTStorageProvider{},
        coordinationrest.RESTStorageProvider{},
        discoveryrest.StorageProvider{},
        extensionsrest.RESTStorageProvider{},
        networkingrest.RESTStorageProvider{},
        noderest.RESTStorageProvider{},
        policyrest.RESTStorageProvider{},
        rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
        schedulingrest.RESTStorageProvider{},
        settingsrest.RESTStorageProvider{},
        storagerest.RESTStorageProvider{},
        flowcontrolrest.RESTStorageProvider{},
        // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
        // See https://github.com/kubernetes/kubernetes/issues/42392
        appsrest.StorageProvider{},
        admissionregistrationrest.RESTStorageProvider{},
        eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }
    if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
        return nil, err
    }

    if c.ExtraConfig.Tunneler != nil {
        m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
    }

    m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
        kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
        if err != nil {
            return err
        }
        controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)

        // prime values and start listeners
        if m.ClusterAuthenticationInfo.ClientCA != nil {
            if notifier, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.Notifier); ok {
                notifier.AddListener(controller)
            }
            if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
                // runonce to be sure that we have a value.
                if err := controller.RunOnce(); err != nil {
                    runtime.HandleError(err)
                }
                go controller.Run(1, hookContext.StopCh)
            }
        }
        if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
            if notifier, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.Notifier); ok {
                notifier.AddListener(controller)
            }
            if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
                // runonce to be sure that we have a value.
                if err := controller.RunOnce(); err != nil {
                    runtime.HandleError(err)
                }
                go controller.Run(1, hookContext.StopCh)
            }
        }

        go controller.Run(1, hookContext.StopCh)
        return nil
    })

    return m, nil
}

func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
    apiGroupsInfo := []*genericapiserver.APIGroupInfo{}

    for _, restStorageBuilder := range restStorageProviders {
        groupName := restStorageBuilder.GroupName()
        if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {
            klog.V(1).Infof("Skipping disabled API group %q.", groupName)
            continue
        }
        apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
        if err != nil {
            return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
        }
        if !enabled {
            klog.Warningf("API group %q is not enabled, skipping.", groupName)
            continue
        }
        klog.V(1).Infof("Enabling API group %q.", groupName)

        if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
            name, hook, err := postHookProvider.PostStartHook()
            if err != nil {
                klog.Fatalf("Error building PostStartHook: %v", err)
            }
            m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
        }

        apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
    }

    if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
        return fmt.Errorf("error in registering group versions: %v", err)
    }
    return nil
}


func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("error building core storage: %v", err)
    }

    controllerName := "bootstrap-controller"
    coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("error in registering group versions: %v", err)
    }
    return nil
}

k8s.io/apiserver/pkg/server/genericapiserver.go 中

func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
    if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
        return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
    }

    openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
    if err != nil {
        return fmt.Errorf("unable to get openapi models: %v", err)
    }

    if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
        return err
    }

    // Install the version handler.
    // Add a handler at /<apiPrefix> to enumerate the supported api versions.
    s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())

    return nil
}

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
    for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
        if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
            klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
            continue
        }

        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if apiGroupInfo.OptionsExternalVersion != nil {
            apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
        }
        apiGroupVersion.OpenAPIModels = openAPIModels
        apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes

        if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
            return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
        }
    }

    return nil
}

k8s.io/apiserver/pkg/endpoints/groupversion.go 中

func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    installer := &APIInstaller{
        group:             g,
        prefix:            prefix,
        minRequestTimeout: g.MinRequestTimeout,
    }

    apiResources, ws, registrationErrors := installer.Install()
    versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
    versionDiscoveryHandler.AddToWebService(ws)
    container.Add(ws)
    return utilerrors.NewAggregate(registrationErrors)
}

k8s.io/apiserver/pkg/endpoints/installer.go 中

func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
    var apiResources []metav1.APIResource
    var errors []error
    ws := a.newWebService()

    // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
    paths := make([]string, len(a.group.Storage))
    var i int = 0
    for path := range a.group.Storage {
        paths[i] = path
        i++
    }
    sort.Strings(paths)
    for _, path := range paths {
        apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
        if err != nil {
            errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
        }
        if apiResource != nil {
            apiResources = append(apiResources, *apiResource)
        }
    }
    return apiResources, ws, errors
}

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
    ...
    creater, isCreater := storage.(rest.Creater)
    ...
        case "POST": // Create a resource.
            var handler restful.RouteFunction
            if isNamedCreater {
                handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
            } else {
                handler = restfulCreateResource(creater, reqScope, admit)
            }
            handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
            if enableWarningHeaders {
                handler = utilwarning.AddWarningsHandler(handler, warnings)
            }
            article := GetArticleForNoun(kind, " ")
            doc := "create" + article + kind
            if isSubresource {
                doc = "create " + subresource + " of" + article + kind
            }
            route := ws.POST(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                // TODO: in some cases, the API may return a v1.Status instead of the versioned object
                // but currently go-restful can't handle multiple different objects being returned.
                Returns(http.StatusCreated, "Created", producedObject).
                Returns(http.StatusAccepted, "Accepted", producedObject).
                Reads(defaultVersionedObject).
                Writes(producedObject)
            if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
                return nil, err
            }
            addParams(route, action.Params)
            routes = append(routes, route)
        for _, route := range routes {
            route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
                Group:   reqScope.Kind.Group,
                Version: reqScope.Kind.Version,
                Kind:    reqScope.Kind.Kind,
            })
            route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
            ws.Route(route)
        }
    ...
}

func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
    }
}

pkg/registry/core/rest/storage_core.go 中

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    apiGroupInfo := genericapiserver.APIGroupInfo{
        PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
        VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
        Scheme:                       legacyscheme.Scheme,
        ParameterCodec:               legacyscheme.ParameterCodec,
        NegotiatedSerializer:         legacyscheme.Codecs,
    }

    var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
    if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.IsVersionRegistered(policyGroupVersion) {
        var err error
        podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    }
    restStorage := LegacyRESTStorage{}

    podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    secretStorage, err := secretstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    podStorage, err := podstore.NewStorage(
        restOptionsGetter,
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    var serviceAccountStorage *serviceaccountstore.REST
    if c.ServiceAccountIssuer != nil && utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) {
        serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, secretStorage.Store, c.ExtendExpiration)
    } else {
        serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false)
    }
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    var serviceClusterIPRegistry rangeallocation.RangeRegistry
    serviceClusterIPRange := c.ServiceIPRange
    if serviceClusterIPRange.IP == nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing")
    }

    serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
        mem := allocator.NewAllocationMap(max, rangeSpec)
        // TODO etcdallocator package to return a storage interface via the storageFactory
        etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
        if err != nil {
            return nil, err
        }
        serviceClusterIPRegistry = etcd
        return etcd, nil
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
    }
    restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry

    // allocator for secondary service ip range
    var secondaryServiceClusterIPAllocator ipallocator.Interface
    if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
        var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
        secondaryServiceClusterIPAllocator, err = ipallocator.NewAllocatorCIDRRange(&c.SecondaryServiceIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
            mem := allocator.NewAllocationMap(max, rangeSpec)
            // TODO etcdallocator package to return a storage interface via the storageFactory
            etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
            if err != nil {
                return nil, err
            }
            secondaryServiceClusterIPRegistry = etcd
            return etcd, nil
        })
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
        }
        restStorage.SecondaryServiceClusterIPAllocator = secondaryServiceClusterIPRegistry
    }

    var serviceNodePortRegistry rangeallocation.RangeRegistry
    serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
        mem := allocator.NewAllocationMap(max, rangeSpec)
        // TODO etcdallocator package to return a storage interface via the storageFactory
        etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
        if err != nil {
            return nil, err
        }
        serviceNodePortRegistry = etcd
        return etcd, nil
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
    }
    restStorage.ServiceNodePortAllocator = serviceNodePortRegistry

    controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage,
        endpointsStorage,
        podStorage.Pod,
        serviceClusterIPAllocator,
        secondaryServiceClusterIPAllocator,
        serviceNodePortAllocator,
        c.ProxyTransport)

    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.LegacyBinding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest,
        "services/proxy":  serviceRestProxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        "nodes":        nodeStorage.Node,
        "nodes/status": nodeStorage.Status,
        "nodes/proxy":  nodeStorage.Proxy,

        "events": eventStorage,

        "limitRanges":                   limitRangeStorage,
        "resourceQuotas":                resourceQuotaStorage,
        "resourceQuotas/status":         resourceQuotaStatusStorage,
        "namespaces":                    namespaceStorage,
        "namespaces/status":             namespaceStatusStorage,
        "namespaces/finalize":           namespaceFinalizeStorage,
        "secrets":                       secretStorage,
        "serviceAccounts":               serviceAccountStorage,
        "persistentVolumes":             persistentVolumeStorage,
        "persistentVolumes/status":      persistentVolumeStatusStorage,
        "persistentVolumeClaims":        persistentVolumeClaimStorage,
        "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
        "configMaps":                    configMapStorage,

        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
        restStorageMap["pods/eviction"] = podStorage.Eviction
    }
    if serviceAccountStorage.Token != nil {
        restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        restStorageMap["pods/ephemeralcontainers"] = podStorage.EphemeralContainers
    }
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap

    return restStorage, apiGroupInfo, nil
}

k8s.io/apiserver/pkg/endpoints/handlers/create.go 中

func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
    return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        // For performance tracking purposes.
        trace := utiltrace.New("Create", utiltrace.Field{Key: "url", Value: req.URL.Path}, utiltrace.Field{Key: "user-agent", Value: &lazyTruncatedUserAgent{req}}, utiltrace.Field{Key: "client", Value: &lazyClientIP{req}})
        defer trace.LogIfLong(500 * time.Millisecond)

        if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
            scope.err(errors.NewBadRequest("the dryRun feature is disabled"), w, req)
            return
        }

        // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
        timeout := parseTimeout(req.URL.Query().Get("timeout"))

        namespace, name, err := scope.Namer.Name(req)
        if err != nil {
            if includeName {
                // name was required, return
                scope.err(err, w, req)
                return
            }

            // otherwise attempt to look up the namespace
            namespace, err = scope.Namer.Namespace(req)
            if err != nil {
                scope.err(err, w, req)
                return
            }
        }

        ctx, cancel := context.WithTimeout(req.Context(), timeout)
        defer cancel()
        ctx = request.WithNamespace(ctx, namespace)
        outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        gv := scope.Kind.GroupVersion()
        s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)

        body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        options := &metav1.CreateOptions{}
        values := req.URL.Query()
        if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
            err = errors.NewBadRequest(err.Error())
            scope.err(err, w, req)
            return
        }
        if errs := validation.ValidateCreateOptions(options); len(errs) > 0 {
            err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "CreateOptions"}, "", errs)
            scope.err(err, w, req)
            return
        }
        options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))

        defaultGVK := scope.Kind
        original := r.New()
        trace.Step("About to convert to expected version")
        obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        if err != nil {
            err = transformDecodeError(scope.Typer, err, original, gvk, body)
            scope.err(err, w, req)
            return
        }
        if gvk.GroupVersion() != gv {
            err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", gvk.GroupVersion().String(), gv.String()))
            scope.err(err, w, req)
            return
        }
        trace.Step("Conversion done")

        ae := request.AuditEventFrom(ctx)
        admit = admission.WithAudit(admit, ae)
        audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)

        userInfo, _ := request.UserFrom(ctx)

        // On create, get name from new object if unset
        if len(name) == 0 {
            _, name, _ = scope.Namer.ObjectName(obj)
        }

        trace.Step("About to store object in database")
        admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
        requestFunc := func() (runtime.Object, error) {
            return r.Create(
                ctx,
                name,
                obj,
                rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
                options,
            )
        }
        result, err := finishRequest(timeout, func() (runtime.Object, error) {
            if scope.FieldManager != nil {
                liveObj, err := scope.Creater.New(scope.Kind)
                if err != nil {
                    return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
                }
                obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
            }
            if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
                if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
                    return nil, err
                }
            }
            result, err := requestFunc()
            // If the object wasn't committed to storage because it's serialized size was too large,
            // it is safe to remove managedFields (which can be large) and try again.
            if isTooLargeError(err) {
                if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
                    accessor.SetManagedFields(nil)
                    result, err = requestFunc()
                }
            }
            return result, err
        })
        if err != nil {
            scope.err(err, w, req)
            return
        }
        trace.Step("Object stored in database")

        code := http.StatusCreated
        status, ok := result.(*metav1.Status)
        if ok && err == nil && status.Code == 0 {
            status.Code = int32(code)
        }

        transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
    }
}

pkg/registry/core/pod/storage/storage.go中

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {

    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &api.Pod{} },
        NewListFunc:              func() runtime.Object { return &api.PodList{} },
        PredicateFunc:            registrypod.MatchPod,
        DefaultQualifiedResource: api.Resource("pods"),

        CreateStrategy:      registrypod.Strategy,
        UpdateStrategy:      registrypod.Strategy,
        DeleteStrategy:      registrypod.Strategy,
        ReturnDeletedObject: true,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{
        RESTOptions: optsGetter,
        AttrFunc:    registrypod.GetAttrs,
        TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
        Indexers:    registrypod.Indexers(),
    }
    if err := store.CompleteWithOptions(options); err != nil {
        return PodStorage{}, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = registrypod.StatusStrategy
    ephemeralContainersStore := *store
    ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy

    bindingREST := &BindingREST{store: store}
    return PodStorage{
        Pod:                 &REST{store, proxyTransport},
        Binding:             &BindingREST{store: store},
        LegacyBinding:       &LegacyBindingREST{bindingREST},
        Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
        Status:              &StatusREST{store: &statusStore},
        EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
        Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }, nil
}

k8s.io/apiserver/pkg/registry/generic/registry/store.go中

func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    if e.DefaultQualifiedResource.Empty() {
        return fmt.Errorf("store %#v must have a non-empty qualified resource", e)
    }
    if e.NewFunc == nil {
        return fmt.Errorf("store for %s must have NewFunc set", e.DefaultQualifiedResource.String())
    }
    if e.NewListFunc == nil {
        return fmt.Errorf("store for %s must have NewListFunc set", e.DefaultQualifiedResource.String())
    }
    if (e.KeyRootFunc == nil) != (e.KeyFunc == nil) {
        return fmt.Errorf("store for %s must set both KeyRootFunc and KeyFunc or neither", e.DefaultQualifiedResource.String())
    }

    if e.TableConvertor == nil {
        return fmt.Errorf("store for %s must set TableConvertor; rest.NewDefaultTableConvertor(e.DefaultQualifiedResource) can be used to output just name/creation time", e.DefaultQualifiedResource.String())
    }

    var isNamespaced bool
    switch {
    case e.CreateStrategy != nil:
        isNamespaced = e.CreateStrategy.NamespaceScoped()
    case e.UpdateStrategy != nil:
        isNamespaced = e.UpdateStrategy.NamespaceScoped()
    default:
        return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
    }

    if e.DeleteStrategy == nil {
        return fmt.Errorf("store for %s must have DeleteStrategy set", e.DefaultQualifiedResource.String())
    }

    if options.RESTOptions == nil {
        return fmt.Errorf("options for %s must have RESTOptions set", e.DefaultQualifiedResource.String())
    }

    attrFunc := options.AttrFunc
    if attrFunc == nil {
        if isNamespaced {
            attrFunc = storage.DefaultNamespaceScopedAttr
        } else {
            attrFunc = storage.DefaultClusterScopedAttr
        }
    }
    if e.PredicateFunc == nil {
        e.PredicateFunc = func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
            return storage.SelectionPredicate{
                Label:    label,
                Field:    field,
                GetAttrs: attrFunc,
            }
        }
    }

    err := validateIndexers(options.Indexers)
    if err != nil {
        return err
    }

    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    if err != nil {
        return err
    }

    // ResourcePrefix must come from the underlying factory
    prefix := opts.ResourcePrefix
    if !strings.HasPrefix(prefix, "/") {
        prefix = "/" + prefix
    }
    if prefix == "/" {
        return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
    }

    // Set the default behavior for storage key generation
    if e.KeyRootFunc == nil && e.KeyFunc == nil {
        if isNamespaced {
            e.KeyRootFunc = func(ctx context.Context) string {
                return NamespaceKeyRootFunc(ctx, prefix)
            }
            e.KeyFunc = func(ctx context.Context, name string) (string, error) {
                return NamespaceKeyFunc(ctx, prefix, name)
            }
        } else {
            e.KeyRootFunc = func(ctx context.Context) string {
                return prefix
            }
            e.KeyFunc = func(ctx context.Context, name string) (string, error) {
                return NoNamespaceKeyFunc(ctx, prefix, name)
            }
        }
    }

    // We adapt the store's keyFunc so that we can use it with the StorageDecorator
    // without making any assumptions about where objects are stored in etcd
    keyFunc := func(obj runtime.Object) (string, error) {
        accessor, err := meta.Accessor(obj)
        if err != nil {
            return "", err
        }

        if isNamespaced {
            return e.KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())
        }

        return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())
    }

    if e.DeleteCollectionWorkers == 0 {
        e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
    }

    e.EnableGarbageCollection = opts.EnableGarbageCollection

    if e.ObjectNameFunc == nil {
        e.ObjectNameFunc = func(obj runtime.Object) (string, error) {
            accessor, err := meta.Accessor(obj)
            if err != nil {
                return "", err
            }
            return accessor.GetName(), nil
        }
    }

    if e.Storage.Storage == nil {
        e.Storage.Codec = opts.StorageConfig.Codec
        var err error
        e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
            opts.StorageConfig,
            prefix,
            keyFunc,
            e.NewFunc,
            e.NewListFunc,
            attrFunc,
            options.TriggerFunc,
            options.Indexers,
        )
        if err != nil {
            return err
        }
        e.StorageVersioner = opts.StorageConfig.EncodeVersioner

        if opts.CountMetricPollPeriod > 0 {
            stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
            previousDestroy := e.DestroyFunc
            e.DestroyFunc = func() {
                stopFunc()
                if previousDestroy != nil {
                    previousDestroy()
                }
            }
        }
    }

    return nil
}

func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
    if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
        return nil, err
    }
    // at this point we have a fully formed object.  It is time to call the validators that the apiserver
    // handling chain wants to enforce.
    if createValidation != nil {
        if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
            return nil, err
        }
    }

    name, err := e.ObjectNameFunc(obj)
    if err != nil {
        return nil, err
    }
    key, err := e.KeyFunc(ctx, name)
    if err != nil {
        return nil, err
    }
    qualifiedResource := e.qualifiedResourceFromContext(ctx)
    ttl, err := e.calculateTTL(obj, 0, false)
    if err != nil {
        return nil, err
    }
    out := e.NewFunc()
    if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
        err = storeerr.InterpretCreateError(err, qualifiedResource, name)
        err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
        if !apierrors.IsAlreadyExists(err) {
            return nil, err
        }
        if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
            return nil, err
        }
        accessor, errGetAcc := meta.Accessor(out)
        if errGetAcc != nil {
            return nil, err
        }
        if accessor.GetDeletionTimestamp() != nil {
            msg := &err.(*apierrors.StatusError).ErrStatus.Message
            *msg = fmt.Sprintf("object is being deleted: %s", *msg)
        }
        return nil, err
    }
    if e.AfterCreate != nil {
        if err := e.AfterCreate(out); err != nil {
            return nil, err
        }
    }
    if e.Decorator != nil {
        if err := e.Decorator(out); err != nil {
            return nil, err
        }
    }
    return out, nil
}

k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go中

func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
    if dryRun {
        if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
            return storage.NewKeyExistsError(key, 0)
        }
        return s.copyInto(obj, out)
    }
    return s.Storage.Create(ctx, key, obj, out, ttl)
}

cmd/kube-apiserver/app/aggregator.go

func createAggregatorConfig(
    kubeAPIServerConfig genericapiserver.Config,
    commandOptions *options.ServerRunOptions,
    externalInformers kubeexternalinformers.SharedInformerFactory,
    serviceResolver aggregatorapiserver.ServiceResolver,
    proxyTransport *http.Transport,
    pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
    // make a shallow copy to let us twiddle a few things
    // most of the config actually remains the same.  We only need to mess with a couple items related to the particulars of the aggregator
    genericConfig := kubeAPIServerConfig
    genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
    genericConfig.RESTOptionsGetter = nil

    // override genericConfig.AdmissionControl with kube-aggregator's scheme,
    // because aggregator apiserver should use its own scheme to convert its own resources.
    err := commandOptions.Admission.ApplyTo(
        &genericConfig,
        externalInformers,
        genericConfig.LoopbackClientConfig,
        feature.DefaultFeatureGate,
        pluginInitializers...)
    if err != nil {
        return nil, err
    }

    // copy the etcd options so we don't mutate originals.
    etcdOptions := *commandOptions.Etcd
    etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
    etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
    etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
    genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}

    // override MergedResourceConfig with aggregator defaults and registry
    if err := commandOptions.APIEnablement.ApplyTo(
        &genericConfig,
        aggregatorapiserver.DefaultAPIResourceConfigSource(),
        aggregatorscheme.Scheme); err != nil {
        return nil, err
    }

    var certBytes, keyBytes []byte
    if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 {
        certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile)
        if err != nil {
            return nil, err
        }
        keyBytes, err = ioutil.ReadFile(commandOptions.ProxyClientKeyFile)
        if err != nil {
            return nil, err
        }
    }

    aggregatorConfig := &aggregatorapiserver.Config{
        GenericConfig: &genericapiserver.RecommendedConfig{
            Config:                genericConfig,
            SharedInformerFactory: externalInformers,
        },
        ExtraConfig: aggregatorapiserver.ExtraConfig{
            ProxyClientCert: certBytes,
            ProxyClientKey:  keyBytes,
            ServiceResolver: serviceResolver,
            ProxyTransport:  proxyTransport,
        },
    }

    // we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
    aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}

    return aggregatorConfig, nil
}

k8s.io/apiserver/pkg/server/options/etcd.go 中

func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    ret := generic.RESTOptions{
        StorageConfig:           &f.Options.StorageConfig,
        Decorator:               generic.UndecoratedStorage,
        EnableGarbageCollection: f.Options.EnableGarbageCollection,
        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
        ResourcePrefix:          resource.Group + "/" + resource.Resource,
        CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        size, ok := sizes[resource]
        if ok && size > 0 {
            klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
        }
        if ok && size <= 0 {
            ret.Decorator = generic.UndecoratedStorage
        } else {
            ret.Decorator = genericregistry.StorageWithCacher()
        }
    }
    return ret, nil
}

k8s.io/apiserver/pkg/registry/generic/storage_decorator.go 中

func UndecoratedStorage(
    config *storagebackend.Config,
    resourcePrefix string,
    keyFunc func(obj runtime.Object) (string, error),
    newFunc func() runtime.Object,
    newListFunc func() runtime.Object,
    getAttrsFunc storage.AttrFunc,
    trigger storage.IndexerFuncs,
    indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
    return NewRawStorage(config)
}

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
    return factory.Create(*config)
}

k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go 中

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case "etcd2":
        return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
        return newETCD3Storage(c)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}

k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go 中

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
    if err != nil {
        return nil, nil, err
    }

    client, err := newETCD3Client(c.Transport)
    if err != nil {
        stopCompactor()
        return nil, nil, err
    }

    stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
    if err != nil {
        return nil, nil, err
    }

    var once sync.Once
    destroyFunc := func() {
        // we know that storage destroy funcs are called multiple times (due to reuse in subresources).
        // Hence, we only destroy once.
        // TODO: fix duplicated storage destroy calls higher level
        once.Do(func() {
            stopCompactor()
            stopDBSizeMonitor()
            client.Close()
        })
    }
    transformer := c.Transformer
    if transformer == nil {
        transformer = value.IdentityTransformer
    }
    return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

上一篇下一篇

猜你喜欢

热点阅读