Kubernetes 源码分析之 "kubectl get"

2024-01-01  本文已影响0人  河码匠

1. 基本环境

操作系统环境 macos
Kubernetes 版本 1.29
go 版本 1.21.4

2. get.go 源码分析

kubect get 命令源码位置。kubernetes/staging/src/k8s.io/kubectl/pkg/cmd/get/get.go

以下是 get.go 源码。一些说明

# variables 这里面定义的变量是 'kubect get -h' 时候的一些说明
var (
    getLong = "..."
    getExample = "..."
)

# GetOptions 是存储和处理与 'kubectl get' 命令相关的参数和状态
type GetOptions struct {
    # 'kubectl'  命令行中 "-o, --output" 指定输入格式。
    # 用于存储和处理与输出格式相关的所有标志(flags)和参数。
    # 它包括了决定如何格式化输出的信息,例如是否显示所有字段、是否以 JSON 或 YAML 格式显示、是否包含标签列等
    PrintFlags             *PrintFlags

    # 作用是基于 PrintFlags 中的设置来创建一个合适的打印器(Printer)实例
    ToPrinter              func(*meta.RESTMapping, *bool, bool, bool) (printers.ResourcePrinterFunc, error)

    # 用于检查当前配置的打印器是否产生人类可读的输出,如表格格式,而非机器可读的格式,如 JSON 或 YAML
    IsHumanReadablePrinter bool

    # 它用于获取当前命令的父命令
    #  例如:在 'kubectl get pods' 命令中,'get' 是 'kubectl' 的子命令,而 'pods' 是 'get '的子命令。在这种情况下,'get' 是 'pods' 命令的父命令
    CmdParent string
  
    # 用于基于标签选择资源的字符串。这允许用户过滤出具有特定标签的资源
    # " -l, --selector "
    LabelSelector     string

    # 用于基于字段选择资源的字符串。这可以用于更细粒度的过滤。 " --field-selector "
    FieldSelector     string

    # 用于指示是否已明确指定了命名空间
    # 'kubectl' 命令行中 " --namespace "
    ExplicitNamespace bool
    # 指定要查询的命名空间。如果未指定,则使用默认命名空间或上下文中指定的命名空间
    Namespace         string

    # 是否指定所有命名空间
    # 'kubectl' 命令行中 " -A, --all-namespaces=false "
    AllNamespaces     bool

    # 否持续监视资源的状态变化
    # 'kubectl' 命令行中 " -w, --watch=false / --watch-only=false "
    Watch     bool
    WatchOnly bool
    # 指定是否以特定格式输出这些 watch 事件
    OutputWatchEvents bool

    # 'kubectl' 命令行中 " --no-headers=false "
    NoHeaders      bool

    # 'kubectl' 命令行中 " --raw='' "
    Raw       string

    # 'kubectl' 命令行中 " -f, --filename=[] "
    resource.FilenameOptions

    # 'kubectl' 命令行中 " --chunk-size=500 "
    ChunkSize int64

    # 'kubectl' 命令行中 " --sort-by='' "
    SortBy            string

    # 'kubectl' 命令行中 " --subresource='' "
    Subresource       string

    # 'kubectl' 命令行中 " --ignore-not-found=false "
    IgnoreNotFound bool

    # 'kubectl' 命令行中 " --server-print=true "
    ServerPrint bool
    
    # 用于表示和管理命令行输入输出流
    genericiooptions.IOStreams
}

# 在使用 "kubectl get" 命令时,返回的是服务器端定义的数据格式
const (
    useServerPrintColumns = "server-print"
)

# 定义支持的子资源类型
# 这个变量列出了 Kubernetes 中某些资源支持的子资源类型。
# 在 Kubernetes API 中,子资源是资源的特定部分,可以单独寻址和修改
var supportedSubresources = []string{"status", "scale"}

# 创建并返回一个新的 GetOptions 实例。
# 默认返回一个 "chunk size = 500 " 的块。
# 这个结构体包含了执行 kubectl get 命令时所需的所有配置和状态信息
# 这种配置主要用于处理大量数据时的分页或分块查询,提高数据检索的效率和管理性
func NewGetOptions(parent string, streams genericiooptions.IOStreams) *GetOptions {
    return &GetOptions{
        PrintFlags: NewGetPrintFlags(),
        CmdParent:  parent,
        IOStreams:   streams,
        ChunkSize:   cmdutil.DefaultChunkSize,
        ServerPrint: true,
    }
}

# 创建和初始化 get 命令
# "f cmdutil.Factory" 参数: "Factory" 接口提供了方法来获取 Kubernetes 集群的配置信息和访问 Kubernetes API 服务器资源的方法。
# 如用于访问 Pods、Deployments 等资源的客户端。
# 这里返回的 cmd 对象将在 cmd.go 中统一添加到 kubectl 父命令中
func NewCmdGet(parent string, f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
    o := NewGetOptions(parent, streams)
    # cobra go 的命令行模块
    cmd := &cobra.Command{
        Use:                   fmt.Sprintf("get [(-o|--output=)%s] (TYPE[.VERSION][.GROUP] [NAME | -l label] | TYPE[.VERSION][.GROUP]/NAME ...) [flags]", strings.Join(o.PrintFlags.AllowedFormats(), "|")),
        DisableFlagsInUseLine: true,
        Short:                 i18n.T("Display one or many resources"),
        Long:                  getLong + "\n\n" + cmdutil.SuggestAPIResources(parent),
        Example:               getExample,
        // ValidArgsFunction is set when this function is called so that we have access to the util package
        Run: func(cmd *cobra.Command, args []string) {
            # 补全默认参数
            cmdutil.CheckErr(o.Complete(f, cmd, args))
            # 检测参数
            cmdutil.CheckErr(o.Validate())
            # 根据参数执行命令,获取资源数据
            cmdutil.CheckErr(o.Run(f, args))
        },
        SuggestFor: []string{"list", "ps"},
    }

    o.PrintFlags.AddFlags(cmd)
        
    # 下面的 "cmd.Flags()" 是在定义命令的 flag 参数
    # 如 "raw" 在命令行时 "kubectl get --raw"
    # 参数说明
    #     &o.Raw:目标变量的指针
    #     "raw":flag的名称;如 "--raw"
    #      o.Raw: 短名称;如 " -x "
    #     "Raw ... file.": flag的说明描述
    cmd.Flags().StringVar(&o.Raw, "raw", o.Raw, "Raw URI to request from the server.  Uses the transport specified by the kubeconfig file.")
    cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "After listing/getting the requested object, watch for changes.")
    cmd.Flags().BoolVar(&o.WatchOnly, "watch-only", o.WatchOnly, "Watch for changes to the requested object(s), without listing/getting first.")
    cmd.Flags().BoolVar(&o.OutputWatchEvents, "output-watch-events", o.OutputWatchEvents, "Output watch event objects when --watch or --watch-only is used. Existing objects are output as initial ADDED events.")
    cmd.Flags().BoolVar(&o.IgnoreNotFound, "ignore-not-found", o.IgnoreNotFound, "If the requested object does not exist the command will return exit code 0.")
    cmd.Flags().StringVar(&o.FieldSelector, "field-selector", o.FieldSelector, "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
    cmd.Flags().BoolVarP(&o.AllNamespaces, "all-namespaces", "A", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")

    # 添加 "--server-print" 的 flag
    addServerPrintColumnFlags(cmd, o)

    # 添加 "-f, --filename" 的 flag
    cmdutil.AddFilenameOptionFlags(cmd, &o.FilenameOptions, "identifying the resource to get from a server.")

    # 添加 "--chunk-size" 的 flag
    cmdutil.AddChunkSizeFlag(cmd, &o.ChunkSize)

    # 添加 "-l, --selector" 的 flag
    cmdutil.AddLabelSelectorFlagVar(cmd, &o.LabelSelector)

    # 添加 "--subresource" 的 flag
    cmdutil.AddSubresourceFlags(cmd, &o.Subresource, "If specified, gets the subresource of the requested object.", supportedSubresources...)
    return cmd
}

# 对 GetOptions 中参数进行补全
func (o *GetOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
    # 对 bool 的参数的设置和有默认值的参数设置
}

# 对用户输入的 flag 进行验证
func (o *GetOptions) Validate() error {
    # 验证 Watch、WatchOnly、LabelSelector、OutputFormat、OutputFormat等
}


# 接口用于确保在解析命令行参数时,能够保留参数或标志的原始顺序
type OriginalPositioner interface {
    OriginalPosition(int) int
}

# 其中 Nop 通常是 "No Operation" 的缩写
type NopPositioner struct{}

func (t *NopPositioner) OriginalPosition(ix int) int {
    return ix
}

type RuntimeSorter struct {
    #  "--sort-by" 的内容
    field      string
    decoder    runtime.Decoder
    # "get" 搜索出来的对象
    objects    []runtime.Object
    positioner OriginalPositioner
}

# 处理命令行 "--sort-by" flag 时会执行到这里
func (r *RuntimeSorter) Sort() error {
    if len(r.objects) == 0 {
        return nil
    }
    if len(r.objects) == 1 {
        # r.objects[0] 这里面是 get 出来的数据
        _, isTable := r.objects[0].(*metav1.Table)
        if !isTable {
            return nil
        }
    }

    includesTable := false
    includesRuntimeObjs := false

    # 对搜索出来的对象进行类型判断
    for _, obj := range r.objects {
        switch t := obj.(type) {
        case *metav1.Table:
            includesTable = true

            if sorter, err := NewTableSorter(t, r.field); err != nil {
                return err
            } else if err := sorter.Sort(); err != nil {
                return err
            }
        default:
            includesRuntimeObjs = true
        }
    }

    r.positioner = &NopPositioner{}
    # 判断是不是 mixed Table 和 non-Table
    if includesRuntimeObjs && includesTable {
        return fmt.Errorf("sorting is not supported on mixed Table and non-Table object lists")
    }

    # 判断是否执行新排序 "NewTableSorter"
    if includesTable {
        return nil
    }

    # 执行旧排序 "kubernetes/staging/src/k8s.io/kubectl/pkg/cmd/get/storter.gp"
    var err error
    if r.positioner, err = SortObjects(r.decoder, r.objects, r.field); err != nil {
        return err
    }
    return nil
}



// OriginalPosition returns the original position of a runtime object
func (r *RuntimeSorter) OriginalPosition(ix int) int {
    if r.positioner == nil {
        return 0
    }
    return r.positioner.OriginalPosition(ix)
}

// WithDecoder allows custom decoder to be set for testing
func (r *RuntimeSorter) WithDecoder(decoder runtime.Decoder) *RuntimeSorter {
    r.decoder = decoder
    return r
}

// NewRuntimeSorter returns a new instance of RuntimeSorter
# 新排序方法
# sortBy 参数是 "--sort-by" 的内容
func NewRuntimeSorter(objects []runtime.Object, sortBy string) *RuntimeSorter {
    # "RelaxedJSONPathExpression" 这个排序在 "kubernetes/staging/src/k8s.io/kubectl/pkg/cmd/get/customcolumn.go" 中
    parsedField, err := RelaxedJSONPathExpression(sortBy)
    if err != nil {
        parsedField = sortBy
    }

    return &RuntimeSorter{
        field:   parsedField,
        decoder: kubernetesscheme.Codecs.UniversalDecoder(),
        objects: objects,
    }
}



# 执行获取数据函数
func (o *GetOptions) Run(f cmdutil.Factory, args []string) error {
    # 是否设置 "raw" 执行原始的 HTTP 请求。
    # 以获取 Kubernetes API 提供的数据,但未经 kubectl 标准化格式化的数据
    if len(o.Raw) > 0 {
        restClient, err := f.RESTClient()
        if err != nil {
            return err
        }
        return rawhttp.RawGet(restClient, o.IOStreams, o.Raw)
    }

    # 判断是否启用 "--watch" \ "--watch-only" 参数
    if o.Watch || o.WatchOnly {
        return o.watch(f, args)
    }

    chunkSize := o.ChunkSize
    if len(o.SortBy) > 0 {
        // TODO(juanvallejo): in the future, we could have the client use chunking
        // to gather all results, then sort them all at the end to reduce server load.
        chunkSize = 0
    }
    # 创建一个与 CLI 相互的对象
    # "cli-runtime/pkg/resource/builder.go" 在这边api请求获取数据
    r := f.NewBuilder().
        Unstructured().
        NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
        FilenameParam(o.ExplicitNamespace, &o.FilenameOptions).
        LabelSelectorParam(o.LabelSelector).
        FieldSelectorParam(o.FieldSelector).
        Subresource(o.Subresource).
        RequestChunksOf(chunkSize).
        # 定义获取资源的类型,如 pods nodes
        ResourceTypeOrNameArgs(true, args...).
        ContinueOnError().
        Latest().
        Flatten().
        TransformRequests(o.transformRequests).
        # 通过 client-go 请求资源 API
        Do()

    if o.IgnoreNotFound {
        r.IgnoreErrors(apierrors.IsNotFound)
    }
    if err := r.Err(); err != nil {
        return err
    }

    if !o.IsHumanReadablePrinter {
        return o.printGeneric(r)
    }

    allErrs := []error{}
    errs := sets.NewString()
    # 遍历所有资源数据
    infos, err := r.Infos()
    if err != nil {
        allErrs = append(allErrs, err)
    }
    printWithKind := multipleGVKsRequested(infos)

    objs := make([]runtime.Object, len(infos))
    for ix := range infos {
        objs[ix] = infos[ix].Object
    }

    # 排序
    var positioner OriginalPositioner
    if len(o.SortBy) > 0 {
        sorter := NewRuntimeSorter(objs, o.SortBy)
        if err := sorter.Sort(); err != nil {
            return err
        }
        positioner = sorter
    }

    var printer printers.ResourcePrinter
    var lastMapping *meta.RESTMapping

    // track if we write any output
    trackingWriter := &trackingWriterWrapper{Delegate: o.Out}
    // output an empty line separating output
    separatorWriter := &separatorWriterWrapper{Delegate: trackingWriter}

    # 一次get多种资源时,打印分组
    w := printers.GetNewTabWriter(separatorWriter)
    allResourcesNamespaced := !o.AllNamespaces
    for ix := range objs {
        var mapping *meta.RESTMapping
        var info *resource.Info
        if positioner != nil {
            info = infos[positioner.OriginalPosition(ix)]
            mapping = info.Mapping
        } else {
            info = infos[ix]
            mapping = info.Mapping
        }

        allResourcesNamespaced = allResourcesNamespaced && info.Namespaced()
        printWithNamespace := o.AllNamespaces

        if mapping != nil && mapping.Scope.Name() == meta.RESTScopeNameRoot {
            printWithNamespace = false
        }

        if shouldGetNewPrinterForMapping(printer, lastMapping, mapping) {
            w.Flush()
            w.SetRememberedWidths(nil)

            // add linebreaks between resource groups (if there is more than one)
            // when it satisfies all following 3 conditions:
            // 1) it's not the first resource group
            // 2) it has row header
            // 3) we've written output since the last time we started a new set of headers
            if lastMapping != nil && !o.NoHeaders && trackingWriter.Written > 0 {
                separatorWriter.SetReady(true)
            }

            printer, err = o.ToPrinter(mapping, nil, printWithNamespace, printWithKind)
            if err != nil {
                if !errs.Has(err.Error()) {
                    errs.Insert(err.Error())
                    allErrs = append(allErrs, err)
                }
                continue
            }

            lastMapping = mapping
        }

        printer.PrintObj(info.Object, w)
    }
    w.Flush()
    if trackingWriter.Written == 0 && !o.IgnoreNotFound && len(allErrs) == 0 {
        // if we wrote no output, and had no errors, and are not ignoring NotFound, be sure we output something
        if allResourcesNamespaced {
            fmt.Fprintf(o.ErrOut, "No resources found in %s namespace.\n", o.Namespace)
        } else {
            fmt.Fprintln(o.ErrOut, "No resources found")
        }
    }
    return utilerrors.NewAggregate(allErrs)
}

3. 从输入 kubectl get 开始大概的流程

kubectl get
上一篇 下一篇

猜你喜欢

热点阅读