Kubesphere 源码分析2 ks-apiserver

2021-07-08  本文已影响0人  梅_梅

1. 整体结构

如下图所示
ks-apiserver 的主要功能是聚合整个系统的业务功能对外提供同一的API入口,如下图所示ks-apiserver聚合的功能对象主要包含以下几类

  1. kubernetes原生的对象,由ks-apiserver连接api-server,直接获取更改etcd中kubernetes的原始数据(origin data)即可,操作的对象即kubernetes原生的configmap. deployment等对象。
  2. ks-controller-manager 封装的对象,ks-controller-manager的封装功能逻辑以crd对象的方式表现在etcd中,ks-apiserver通过连接k8s-apiserver操作etcd中的crd数据(crd data)即可,操作 ks-controller-manager 扩展的逻辑功能。
  3. 第三方的operator对象,如prometheus-operator等第三方完成的模块以operator的方式运行在系统中,其功能对应的对象也以crd的形式存放载etcd中,ks-apiserver也是通过和k8s-apiserver交互操作对应的crd完成。
  4. 普通的服务对象,如kenkins,sonarqube等以普通服务的方式运行在系统中,ks-apiserver直接通过网络调用和此类对象交互

以上,ks-apiserver就完成了和各个内部对象的交互,即内部API(inner API aggregate)。ks-apiserver在对这些各个模块的功能进行整合,对外提供统一的API,即外部API(out API aggregate)

1.png

2. 代码分析

根据整体结构分析的总结,下面主要从ks-apiserver建立各个 业务资源管理句柄,到对外的 接口封装,接口注册,和接口的权限审计等功能的注入进行分析.

2.1 资源管理句柄创建

以下代码主要是建立各个资源管理句柄。

  1. 源码中review1 建立了对k8s-apiserver的连接,通过informer管理了以下资源
  1. 源码中review2中建立了对部分服务的远程调用连接,如对prometheus的数据查询,和对Devops等套件的管理。交互方式为http等普通网络调用。
# "cmd/ks-apiserver/app/options/options.go" 234 lines --49%-
// NewAPIServer creates an APIServer instance using given options
func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
        apiServer := &apiserver.APIServer{
                Config: s.Config,
        }   
     
        // review 1. 建立和kubernetes的连接通过informer管理以crd呈现的资源以及为kuberentes原生资源做直接代理
        kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
        if err != nil {
                return nil, err 
        }   
        apiServer.KubernetesClient = kubernetesClient
                                                                                                                                                                                                            
        informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),
                kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
        apiServer.InformerFactory = informerFactory
        
       // review 2. 通过http等公共协议远程调用管理部分通过服务接口直接提供的资源    
        if s.MonitoringOptions == nil || len(s.MonitoringOptions.Endpoint) == 0 { 
                return nil, fmt.Errorf("moinitoring service address in configuration MUST not be empty, please check configmap/kubesphere-config in kubesphere-system namespace")
        } else {
                monitoringClient, err := prometheus.NewPrometheus(s.MonitoringOptions)
                if err != nil {
                        return nil, fmt.Errorf("failed to connect to prometheus, please check prometheus status, error: %v", err)
                }   
                apiServer.MonitoringClient = monitoringClient
        }   
 
        apiServer.MetricsClient = metricsserver.NewMetricsClient(kubernetesClient.Kubernetes(), s.KubernetesOptions)
 
       ...
       ...

        if s.DevopsOptions.Host != "" {
                devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions)
                if err != nil {
                        return nil, fmt.Errorf("failed to connect to jenkins, please check jenkins status, error: %v", err)
                }
                apiServer.DevopsClient = devopsClient
        }
        
        if s.SonarQubeOptions.Host != "" {
                sonarClient, err := sonarqube.NewSonarQubeClient(s.SonarQubeOptions)
                if err != nil {
                        return nil, fmt.Errorf("failed to connecto to sonarqube, please check sonarqube status, error: %v", err)
                }
                apiServer.SonarClient = sonarqube.NewSonar(sonarClient.SonarQube())
        }
        
        var cacheClient cache.Interface
        if s.RedisOptions != nil && len(s.RedisOptions.Host) != 0 {
                if s.RedisOptions.Host == fakeInterface && s.DebugMode {
                        apiServer.CacheClient = cache.NewSimpleCache()
                } else {
                        cacheClient, err = cache.NewRedisClient(s.RedisOptions, stopCh)
                        if err != nil {
                                return nil, fmt.Errorf("failed to connect to redis service, please check redis status, error: %v", err)
                        }
                        apiServer.CacheClient = cacheClient
                }
        } else {
                klog.Warning("ks-apiserver starts without redis provided, it will use in memory cache. " +
                        "This may cause inconsistencies when running ks-apiserver with multiple replicas.")
                apiServer.CacheClient = cache.NewSimpleCache()
        }  

2.2 接口封装

以下为工作空间资源的操作代码,函数DeleteWorkspace是对go-resetful框架提供的回调接口,即web调用接口,而其具体操作的对象则是通过函数DeleteWorkspace中ksclient即2.1中生成的kubesphere资源句柄进行操作。
接口封装整体采用该流程

func (h *tenantHandler) DeleteWorkspace(request *restful.Request, response *restful.Response) {
        workspace := request.PathParameter("workspace")
                             
        opts := metav1.DeleteOptions{}
                             
        err := request.ReadEntity(&opts)
        if err != nil {      
                opts = *metav1.NewDeleteOptions(0)
        }                    
                             
        err = h.tenant.DeleteWorkspace(workspace, opts)                                                                                                                                                     
                             
        if err != nil {      
                klog.Error(err)
                if errors.IsNotFound(err) {
                        api.HandleNotFound(response, request, err)
                        return
                }            
                api.HandleBadRequest(response, request, err)
                return       
        }                    
                             
        response.WriteEntity(servererr.None)
}       

func (t *tenantOperator) DeleteWorkspace(workspace string, opts metav1.DeleteOptions) error {                                                                                                               
                          
        if opts.PropagationPolicy != nil && *opts.PropagationPolicy == metav1.DeletePropagationOrphan {
                wsp, err := t.DescribeWorkspace(workspace)
                if err != nil {
                        klog.Error(err)
                        return err
                }         
                wsp.Finalizers = append(wsp.Finalizers, orphanFinalizer)
                _, err = t.ksclient.TenantV1alpha2().WorkspaceTemplates().Update(context.Background(), wsp, metav1.UpdateOptions{})
                if err != nil {
                        klog.Error(err)
                        return err
                }         
        }                 
        return t.ksclient.TenantV1alpha2().WorkspaceTemplates().Delete(context.Background(), workspace, opts)
}

2.3 接口注册

最外层各类接口在下面代码统一注册,而具体的接口注册逻辑是在每个AddToContainer中, 而每个AddToContainer所属的对象,即是代表某一类接口集合。

// pkg/kapis/metering/v1alpha1/register.go" 417 lines --19%--

// Install all kubesphere api groups
// Installation happens before all informers start to cache objects, so
//   any attempt to list objects using listers will get empty results.
func (s *APIServer) installKubeSphereAPIs() {
        imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
                user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
                        s.InformerFactory.KubernetesSharedInformerFactory()),
                loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
                s.Config.AuthenticationOptions)
        amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
                s.KubernetesClient.Kubernetes(),
                s.InformerFactory)
        rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
                     
        urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
        urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
        urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.  OpenPitrixOptions))
        urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.     MeteringOptions, nil))
        urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
        urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
        urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
        urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
                s.KubernetesClient.Master()))
        urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
                s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
        urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config()))
        urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
                s.InformerFactory.KubernetesSharedInformerFactory(),
                s.InformerFactory.KubeSphereSharedInformerFactory(),
                s.Config.MultiClusterOptions.ProxyPublishService,
                s.Config.MultiClusterOptions.ProxyPublishAddress,
                s.Config.MultiClusterOptions.AgentImage))
        urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
                group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
                rbacAuthorizer))

...
}

关注其中属于包v1alpha1 的AddToContainer代码,如下对集群以及节点的接口注册入了go-resetful 中。
然后启动web服务,即可提供API服务.

package v1alpha1

func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache,          meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options) error {
        ws := runtime.NewWebService(GroupVersion)
                                                                                                                                                                                                            
        h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, opOptions)
                          
        ws.Route(ws.GET("/cluster").
                To(h.HandleClusterMeterQuery).
                Doc("Get cluster-level meter data.").
                Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
                Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both     cluster CPU usage and disk usage: `meter_cluster_cpu_usage|meter_cluster_memory_usage`.").DataType("string").Required(false)).
                Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").        DataType("string").Required(false)).
                Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").            DataType("string").Required(false)).
                Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The   format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
                Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are         mutually exclusive.").DataType("string").Required(false)).
                Metadata(restfulspec.KeyOpenAPITags, []string{constants.ClusterMetersTag}).
                Writes(model.Metrics{}).
                Returns(http.StatusOK, respOK, model.Metrics{})).
                Produces(restful.MIME_JSON)
        ws.Route(ws.GET("/nodes").
                To(h.HandleNodeMeterQuery).
                Doc("Get node-level meter data of all nodes.").
                Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
                Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both     node CPU usage and disk usage: `meter_node_cpu_usage|meter_node_memory_usage`.").DataType("string").Required(false)).
                Param(ws.QueryParameter("resources_filter", "The node filter consists of a regexp pattern. It specifies which node data to return. For example, the following filter matches both node i-   caojnter and i-cmu82ogj: `i-caojnter|i-cmu82ogj`.").DataType("string").Required(false)).
                Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)).
                Param(ws.QueryParameter("pvc_filter", "The PVCs filter consists of a regexp pattern. It specifies which PVC data to return.").DataType("string").Required(false)).
                Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").        DataType("string").Required(false)).
                Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").            DataType("string").Required(false)).
                Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The   format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
                Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are         mutually exclusive.").DataType("string").Required(false)).
                Param(ws.QueryParameter("sort_metric", "Sort nodes by the specified metric. Not applicable if **start** and **end** are provided.").DataType("string").Required(false)).
                Param(ws.QueryParameter("sort_type", "Sort order. One of asc, desc.").DefaultValue("desc.").DataType("string").Required(false)).
                Param(ws.QueryParameter("page", "The page number. This field paginates result data of each metric, then returns a specific page. For example, setting **page** to 2 returns the second      page. It only applies to sorted metric data.").DataType("integer").Required(false)).                
    ...
}

2.4 权限控制

权限控制是在下面代码注入的,代码中view 1 所示,使用获取的资源句柄,通过im.NewOperator,以及am.NewOperator, rbac.NewRBACAuthorizer进行包装,生成imOperator, amOperator 两个带权限控制的封装句柄以及rbacAuthorizer对象。
在接口注册时将权限对象注入回调接口,从而在接口中做权限控制。


//   any attempt to list objects using listers will get empty results.
func (s *APIServer) installKubeSphereAPIs() {  
       //view 1权限注入 
       imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
                user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
                        s.InformerFactory.KubernetesSharedInformerFactory()),
                loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
                s.Config.AuthenticationOptions)          
        amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
                s.KubernetesClient.Kubernetes(),         
                s.InformerFactory)                       
        rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
                                                                           
        urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
        urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
        urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.  OpenPitrixOptions))                     
        urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.     MeteringOptions, nil))                  
        urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
        urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
        urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
        urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
                s.KubernetesClient.Master()))            
        urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),                                                                                      
                s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
        urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config()))
        urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
                s.InformerFactory.KubernetesSharedInformerFactory(),
                s.InformerFactory.KubeSphereSharedInformerFactory(),
                s.Config.MultiClusterOptions.ProxyPublishService,
                s.Config.MultiClusterOptions.ProxyPublishAddress,
                s.Config.MultiClusterOptions.AgentImage))
        urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
                group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()), 
                rbacAuthorizer))  
...

权限控制在接口中的具体使用可以参考命名空间的查询代码,其中t.authorizer.Authorize以及t.am.ListRoleBindings都是权限的获取,然后再根据权限判断是否阻断流程。

func (t *tenantOperator) ListNamespaces(user user.Info, workspace string, queryParam *query.Query) (*api.ListResult, error) {
        nsScope := request.ClusterScope
        if workspace != "" {
                nsScope = request.WorkspaceScope
                // filter by workspace
                queryParam.Filters[query.FieldLabel] = query.Value(fmt.Sprintf("%s=%s", tenantv1alpha1.WorkspaceLabel, workspace))
        }
        
        listNS := authorizer.AttributesRecord{
                User:            user,
                Verb:            "list",
                Workspace:       workspace,
                Resource:        "namespaces",
                ResourceRequest: true,
                ResourceScope:   nsScope,
        }
        
        decision, _, err := t.authorizer.Authorize(listNS)
        if err != nil {
                klog.Error(err)
                return nil, err
        }
        
        // allowed to list all namespaces in the specified scope
        if decision == authorizer.DecisionAllow {
                result, err := t.resourceGetter.List("namespaces", "", queryParam)
                if err != nil {
                        klog.Error(err)
                        return nil, err
                }
                return result, nil
        }
        
        // retrieving associated resources through role binding
        roleBindings, err := t.am.ListRoleBindings(user.GetName(), user.GetGroups(), "")
        if err != nil {                                                                                                                                                                                     
                klog.Error(err)
                return nil, err
        }
        
        namespaces := make([]runtime.Object, 0)
        for _, roleBinding := range roleBindings {
                obj, err := t.resourceGetter.Get("namespaces", "", roleBinding.Namespace)
                if err != nil {
                        klog.Error(err)
         ...
}

2.5 日志注入

如下代码中view1为日志注入,使用go-resetful完成,每次调用时,将调用注册的回调logRequestAndResponse尝试输出日志。view2中则是注册参数检测回调,对请求检查一些必要的字段,过滤掉外部无关请求。

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
        s.container = restful.NewContainer()     
       //view1
        s.container.Filter(logRequestAndResponse)
        s.container.Router(restful.CurlyRouter{})
        s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
                logStackOnRecover(panicReason, httpWriter)                                                                                                                                                  
        })                              
                                        
        s.installKubeSphereAPIs()       
                                        
        s.installMetricsAPI()           
        //view2
        s.container.Filter(monitorRequest)       
                                        
        for _, ws := range s.container.RegisteredWebServices() {
                klog.V(2).Infof("%s", ws.RootPath())
        }                               
                                        
        s.Server.Handler = s.container  
                                        
        s.buildHandlerChain(stopCh)     
                                        
        return nil                      
}        
上一篇下一篇

猜你喜欢

热点阅读