Kubernetes 源码分析 -- API Server之编解

2018-12-21  本文已影响30人  何约什

在Kubernetes源码分析-- API Server之API Install篇中,我们了解到K8S可以支持多版本的API,但是Rest API的不同版本中接口的输入输出参数的格式是有差别的,Kubernetes是怎么处理这个问题的呢?另外Kubernetes支持yaml、json两个格式的配置,同时又能够支持json、yaml和pb等格式的编解码进行协议传输,那么Kubernetes又是如何实现各种数据对象的序列化、反序列化的呢?

runtime.Scheme

为了同时解决数据对象的序列化、反序列化与多版本数据对象的兼容和转换问题,Kubernetes设计了一套复杂的机制,首先,它设计了runtime.Scheme这个结构体(k8s.io/apimachinery/pkg/runtime/schema.go),以下是对它的定义。

type Scheme struct {
    // versionMap allows one to figure out the go type of an object with
    // the given version and name.
    gvkToType map[schema.GroupVersionKind]reflect.Type

    // typeToGroupVersion allows one to find metadata for a given go object.
    // The reflect.Type we index by should *not* be a pointer.
    typeToGVK map[reflect.Type][]schema.GroupVersionKind

    // unversionedTypes are transformed without conversion in ConvertToVersion.
    unversionedTypes map[reflect.Type]schema.GroupVersionKind

    // unversionedKinds are the names of kinds that can be created in the context of any group
    // or version
    // TODO: resolve the status of unversioned types.
    unversionedKinds map[string]reflect.Type

    // Map from version and resource to the corresponding func to convert
    // resource field labels in that version to internal version.
    fieldLabelConversionFuncs map[string]map[string]FieldLabelConversionFunc

    // defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
    // the provided object must be a pointer.
    defaulterFuncs map[reflect.Type]func(interface{})

    // converter stores all registered conversion functions. It also has
    // default coverting behavior.
    converter *conversion.Converter
}

runtime.Scheme是带版本的API与带版本的配置文件的基础。
runtime.Scheme定义了以下内容:

从上述代码中可以看到,typeToGVK与gkvToType属性就是为了解决数据对象的序列化与反序列化问题,converter属性则负责不同版本的数据对象转换问题;unversionedKind用于映射哪些能够在任意group和version情况下的类型,key是一个string,也就是kind;fieldLabelConversionFuncs:用于解决数据对象的属性名称的兼容性转换和检验,比如讲需要兼容Pod的spec.Host属性改为spec.nodeName的情况。

Kubernetes这个设计思路简单方便地建解决多版本的序列化和数据转换问题,下面是runtime.Scheme里序列化、反序列化的核心方法New()的代码:通过查找gkvToType里匹配的诸恶类型,以反射方法生成一个空的数据对象:

// New returns a new API object of the given version and name, or an error if it hasn't
// been registered. The version and kind fields must be specified.
func (s *Scheme) New(kind schema.GroupVersionKind) (Object, error) {
    if t, exists := s.gvkToType[kind]; exists {
        return reflect.New(t).Interface().(Object), nil
    }

    if t, exists := s.unversionedKinds[kind.Kind]; exists {
        return reflect.New(t).Interface().(Object), nil
    }
    return nil, NewNotRegisteredErrForKind(kind)
}

由于这个方法的实现,也让Scheme实现了ObjectCreater接口。

注意到runtime.Scheme只是实现了一个序列化与类型转换的框架API,提供了注册资源数据类型与转换函数的功能,那么具体的资源数据对象、转换函数又是在哪个包里实现的呢?答案是k8s.io/api/{resource}/{version}/register.go,而核心资源在k8s.io/api/core/v1/register.go中完成注册,以核心资源为例说明一下目录下的关键的代码

在k8s.io/client-go/kubernetes/scheme/register.go中,有一个 init方法,在引用的时候就会被调用。

func init() {
    v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
    AddToScheme(Scheme)
}

该方法中的AddToScheme方法,会调用各个资源、版本的AddToScheme方法,从而把所有的核心数据资源都注册进了runtime.Scheme实例中了。

Serializer

Serializer是用于对象于序列化格式之间进行转的核心接口,它的定义如下所示:

// Encoders write objects to a serialized form
type Encoder interface {
    // Encode writes an object to a stream. Implementations may return errors if the versions are
    // incompatible, or if no conversion is defined.
    Encode(obj Object, w io.Writer) error
}

// Decoders attempt to load an object from data.
type Decoder interface {
    // Decode attempts to deserialize the provided data using either the innate typing of the scheme or the
    // default kind, group, and version provided. It returns a decoded object as well as the kind, group, and
    // version from the serialized data, or an error. If into is non-nil, it will be used as the target type
    // and implementations may choose to use it rather than reallocating an object. However, the object is not
    // guaranteed to be populated. The returned object is not guaranteed to match into. If defaults are
    // provided, they are applied to the data by default. If no defaults or partial defaults are provided, the
    // type of the into may be used to guide conversion decisions.
    Decode(data []byte, defaults *schema.GroupVersionKind, into Object) (Object, *schema.GroupVersionKind, error)
}

// Serializer is the core interface for transforming objects into a serialized format and back.
// Implementations may choose to perform conversion of the object, but no assumptions should be made.
type Serializer interface {
    Encoder
    Decoder
}

目前系统支持的序列化格式有三种:json、yaml和protobuf。可以参考下一节CodecFactory,这里描述了三种序列化格式的初始化。

json与yaml序列化的代码在k8s.io/apimachinary/pkg/runtime/serializer/json/json.go,他们共享一个结构,如下所示:

type Serializer struct {
    meta    MetaFactory
    creater runtime.ObjectCreater       // 用于创建对应Group、版本、kind的资源数据对象实例,一般来说这里会传入Scheme对象
    typer   runtime.ObjectTyper         // 用于从资源数据对象获取对应的Group、Version和Kind,一般来说这里会传入Scheme对象
    yaml    bool                        // 是否yaml
    pretty  bool                        // 是否pretty格式,只有josn支持
}

通过成员 yaml来区分是json还是yaml序列化器;creater和typer成员传入的都是Scheme对象,前面我们分析过Scheme,它可以让我们根据GVK来创建对应的实例,并可以判断一个对象的GVK,这将在我们后续的编解码过程中起到重要作用;而meta成员是一个MetaFactory的对象,它主要用于从序列化数据(这里就是json或者yaml)中获得对象的GroupVersionKind。(下一节也有说明)

下面我们看看Encode方法,Encode方法比较简单,只需要按需把对象,转换为json或yaml格式即可。代码如下所示:

// Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
    // 看起来针对yaml和pretty需要用到gitlib下面的jsoniter库来完成
    if s.yaml {
        json, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(obj)
        if err != nil {
            return err
        }
        data, err := yaml.JSONToYAML(json)
        if err != nil {
            return err
        }
        _, err = w.Write(data)
        return err
    }

    if s.pretty {
        data, err := jsoniter.ConfigCompatibleWithStandardLibrary.MarshalIndent(obj, "", "  ")
        if err != nil {
            return err
        }
        _, err = w.Write(data)
        return err
    }
    // 而针对普通的json序列器,直接通过json.Encoder编码即可
    encoder := json.NewEncoder(w)
    return encoder.Encode(obj)
}

Decode比较复杂,它尝试把提供的数据转换成YAML或者JSON,从中提取出其中存储的schema、kind,并合并缺省的gvk,然后在数据加载到对象中,注意该对象满足前面指定的schema、kind。或者也可以指定into字段(这时候会加载到into对象中)。

func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
    if versioned, ok := into.(*runtime.VersionedObjects); ok {
        into = versioned.Last()
        obj, actual, err := s.Decode(originalData, gvk, into)
        if err != nil {
            return nil, actual, err
        }
        versioned.Objects = []runtime.Object{obj}
        return versioned, actual, nil
    }

    data := originalData
    if s.yaml {
        altered, err := yaml.YAMLToJSON(data)
        if err != nil {
            return nil, nil, err
        }
        data = altered
    }

    actual, err := s.meta.Interpret(data)
    if err != nil {
        return nil, nil, err
    }

    if gvk != nil {
        *actual = gvkWithDefaults(*actual, *gvk)
    }

    if unk, ok := into.(*runtime.Unknown); ok && unk != nil {
        unk.Raw = originalData
        unk.ContentType = runtime.ContentTypeJSON
        unk.GetObjectKind().SetGroupVersionKind(*actual)
        return unk, actual, nil
    }

    if into != nil {
        _, isUnstructured := into.(runtime.Unstructured)
        types, _, err := s.typer.ObjectKinds(into)
        switch {
        case runtime.IsNotRegisteredError(err), isUnstructured:
            if err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(data, into); err != nil {
                return nil, actual, err
            }
            return into, actual, nil
        case err != nil:
            return nil, actual, err
        default:
            *actual = gvkWithDefaults(*actual, types[0])
        }
    }

    if len(actual.Kind) == 0 {
        return nil, actual, runtime.NewMissingKindErr(string(originalData))
    }
    if len(actual.Version) == 0 {
        return nil, actual, runtime.NewMissingVersionErr(string(originalData))
    }

    // use the target if necessary
    obj, err := runtime.UseOrCreateObject(s.typer, s.creater, *actual, into)
    if err != nil {
        return nil, actual, err
    }

    if err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(data, obj); err != nil {
        return nil, actual, err
    }
    return obj, actual, nil
}

CodecFactory

在这里,我们需要首先说明一下CodecFactory实现的接口StorageSerializer,StorageSerializer是一个接口,用来获取encoders,decoders和serailaizers,通过这些序列化器我们可以基于REST来读写数据。通常被client工具使用,这些客户端工具用于读取文件或者持久化restful对象的server存储接口。

而CodecFactory就是StorageSerializer的实现类。在BuildStorageFactory中,我们可以看到调用kubeapiserver.NewStorageFactory方法时传入了legacyscheme.Codecs到第三个参数(该参数类型为runtime.StorageSerializer)。

CodecFactory提供了方法来获取某个指定版本和内容类型的codecs和serializers,它的定义如下所示:

type CodecFactory struct {
    scheme      *runtime.Scheme
    serializers []serializerType
    universal   runtime.Decoder
    accepts     []runtime.SerializerInfo

    legacySerializer runtime.Serializer
}

NewCodecFactory提供了方法来获取序列化对象,这些序列化对象为支持的传输格式进行编解码,也能为首选的内部和外部版本的转换封装。
在将来,内部版本会越来越少使用,调用者可能会使用缺省序列化器来取而代之,然后只需要转回内部共享的对象,如:Status,常用的API对象。

func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
    serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
    return newCodecFactory(scheme, serializers)
}

这里分为两个步骤:

func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
    jsonSerializer := json.NewSerializer(mf, scheme, scheme, false)
    jsonPrettySerializer := json.NewSerializer(mf, scheme, scheme, true)
    yamlSerializer := json.NewYAMLSerializer(mf, scheme, scheme)

    serializers := []serializerType{
        {
            AcceptContentTypes: []string{"application/json"},
            ContentType:        "application/json",
            FileExtensions:     []string{"json"},
            EncodesAsText:      true,
            Serializer:         jsonSerializer,
            PrettySerializer:   jsonPrettySerializer,

            Framer:           json.Framer,
            StreamSerializer: jsonSerializer,
        },
        {
            AcceptContentTypes: []string{"application/yaml"},
            ContentType:        "application/yaml",
            FileExtensions:     []string{"yaml"},
            EncodesAsText:      true,
            Serializer:         yamlSerializer,
        },
    }

    for _, fn := range serializerExtensions {
        if serializer, ok := fn(scheme); ok {
            serializers = append(serializers, serializer)
        }
    }
    return serializers
}

那么这里没有看到protobuf的身影,但是上面的代码中,用到了serializerExtensions,它是一个全局变量,我们在k8s.io/apimachinenry/pkg/runtime/serializer/protobuf_extension.go文件中,找到了它的身影:

func init() {
    serializerExtensions = append(serializerExtensions, protobufSerializer)
}

另外一个MetaFactory的作用是什么?
MetaFactory用于从字符串中解析出version和kind数据。一般是json数据格式的,它有个缺省实现为DefaultMetaFactory,它的实现如下:
输入的二进制数据:{"apiVersion":"apiextensions.k8s.io/v1beta1", "kind": "object"}
那么解析出来的得到:schema.GroupVersionKind{Group:"apiextensions.k8s.io", Version:"v1beta1", Kind: "object"}

newCodeFactory是一个帮助类,用于生成CodecFactory实例,它的主要功能,基于输入的序列化对象集合生成以下几项:

1 统一的decode
2 传统serializer(一般是Json)
3 可接受的编码类型

然后基于这几项数据,生成CodecFactory实例。

func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory {
    decoders := make([]runtime.Decoder, 0, len(serializers))
    var accepts []runtime.SerializerInfo
    alreadyAccepted := make(map[string]struct{})

    var legacySerializer runtime.Serializer
    for _, d := range serializers {
        decoders = append(decoders, d.Serializer)
        for _, mediaType := range d.AcceptContentTypes {
            if _, ok := alreadyAccepted[mediaType]; ok {
                continue
            }
            alreadyAccepted[mediaType] = struct{}{}
            info := runtime.SerializerInfo{
                MediaType:        d.ContentType,
                EncodesAsText:    d.EncodesAsText,
                Serializer:       d.Serializer,
                PrettySerializer: d.PrettySerializer,
            }
            if d.StreamSerializer != nil {
                info.StreamSerializer = &runtime.StreamSerializerInfo{
                    Serializer:    d.StreamSerializer,
                    EncodesAsText: d.EncodesAsText,
                    Framer:        d.Framer,
                }
            }
            accepts = append(accepts, info)
            if mediaType == runtime.ContentTypeJSON {
                legacySerializer = d.Serializer
            }
        }
    }
    if legacySerializer == nil {
        legacySerializer = serializers[0].Serializer
    }

    return CodecFactory{
        scheme:      scheme,
        serializers: serializers,
        universal:   recognizer.NewDecoder(decoders...),

        accepts: accepts,

        legacySerializer: legacySerializer,
    }
}

我们先看一下StorageSerializer的定义:

type StorageSerializer interface {
    // SupportedMediaTypes are the media types supported for reading and writing objects.
    SupportedMediaTypes() []SerializerInfo

    // UniversalDeserializer returns a Serializer that can read objects in multiple supported formats
    // by introspecting the data at rest.
    UniversalDeserializer() Decoder

    // 返回一个encoder,该encoder能够保证写入底层序列化器的对象是指定的GV
    // EncoderForVersion returns an encoder that ensures objects being written to the provided
    // serializer are in the provided group version.
    EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder

    // 返回一个decoder,该decoder能够保证被底层序列化器的反序列化的对象是指定的GV
    // DecoderForVersion returns a decoder that ensures objects being read by the provided
    // serializer are in the provided group version by default.
    DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
}

Encoder是一个接口,它负责把对象序列化写入到io中,但是它不关心具体的对象的GVK,同样Decoder也是如此,从字节码解码出对象出来。

而K8S在考虑了对多种版本的支撑能力,而它需要在存储时保证写入指定的版本,但是在接口层面程序层面又能够支撑多种版本,所以Codec就应运而生,Codec的定义如下所示:

Codec is a Serializer that deals with the details of versioning objects. It offers the same
// interface as Serializer, so this is a marker to consumers that care about the version of the objects
// they receive.
type Codec Serializer

Codec也是由两部分组成:编码器与解码器,但是它要考虑版本功能,所以StorageSerializer提供了两个方法EncoderForVersion和DecoderToVersion来生成支持按指定版本进行序列化与反序列化的编解码器。我们来看看CodecFactory的这两个函数的具体实现。

// CodecForVersions creates a codec with the provided serializer. If an object is decoded and its group is not in the list,
// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not
// converted. If encode or decode are nil, no conversion is performed.
func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode runtime.GroupVersioner, decode runtime.GroupVersioner) runtime.Codec {
    // TODO: these are for backcompat, remove them in the future
    if encode == nil {
        encode = runtime.DisabledGroupVersioner
    }
    if decode == nil {
        decode = runtime.InternalGroupVersioner
    }
    return versioning.NewDefaultingCodecForScheme(f.scheme, encoder, decoder, encode, decode)
}

// DecoderToVersion returns a decoder that targets the provided group version.
func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
    return f.CodecForVersions(nil, decoder, nil, gv)
}

// EncoderForVersion returns an encoder that targets the provided group version.
func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
    return f.CodecForVersions(encoder, nil, gv, nil)
}

从上代码来看,EncoderForVersion与DecodeToVersion都最终调用了同样的方法,如下所示:

func NewDefaultingCodecForScheme(
    // TODO: I should be a scheme interface?
    scheme *runtime.Scheme,
    encoder runtime.Encoder,
    decoder runtime.Decoder,
    encodeVersion runtime.GroupVersioner,
    decodeVersion runtime.GroupVersioner,
) runtime.Codec {
    return NewCodec(encoder, decoder, runtime.UnsafeObjectConvertor(scheme), scheme, scheme, scheme, encodeVersion, decodeVersion)
}
func UnsafeObjectConvertor(scheme *Scheme) ObjectConvertor {
    return unsafeObjectConvertor{scheme}
}
// unsafeObjectConvertor的定义
type unsafeObjectConvertor struct {
    *Scheme
}

type ObjectConvertor interface {
    // Convert attempts to convert one object into another, or returns an error. This method does
    // not guarantee the in object is not mutated. The context argument will be passed to
    // all nested conversions.
    Convert(in, out, context interface{}) error
    // ConvertToVersion takes the provided object and converts it the provided version. This
    // method does not guarantee that the in object is not mutated. This method is similar to
    // Convert() but handles specific details of choosing the correct output version.
    ConvertToVersion(in Object, gv GroupVersioner) (out Object, err error)
    ConvertFieldLabel(version, kind, label, value string) (string, string, error)
}

下面分析一下unsafeObjectObject的具体方法的实现,我们在最终变换版本的时候,是如何实现的:

// ConvertToVersion converts in to the provided outVersion without copying the input first, which
// is only safe if the output object is not mutated or reused.
func (c unsafeObjectConvertor) ConvertToVersion(in Object, outVersion GroupVersioner) (Object, error) {
    return c.Scheme.UnsafeConvertToVersion(in, outVersion)
}

再次回到Scheme结构负责版本转换的方法,这里调用的UnsafeConvertToVersion方法,Scheme还有一个ConvertToVersion的方法,两者都是调用底层的convertToVersion方法, 只是第一个参数不同。
安全转换在两种情况下会报错:目标版本不包括inKind类型的资源数据,转换不能得到有效的对象。
不安全转换输出的对象不与输入对象共享字段,会尝试以最高效率执行转换操作。

// UnsafeConvertToVersion will convert in to the provided target if such a conversion is possible,
// but does not guarantee the output object does not share fields with the input object. It attempts to be as
// efficient as possible when doing conversion.
func (s *Scheme) UnsafeConvertToVersion(in Object, target GroupVersioner) (Object, error) {
    return s.convertToVersion(false, in, target)
}
// convertToVersion handles conversion with an optional copy.
func (s *Scheme) convertToVersion(copy bool, in Object, target GroupVersioner) (Object, error) {
    var t reflect.Type

    if u, ok := in.(Unstructured); ok {
        typed, err := s.unstructuredToTyped(u)
        if err != nil {
            return nil, err
        }

        in = typed
        // unstructuredToTyped returns an Object, which must be a pointer to a struct.
        t = reflect.TypeOf(in).Elem()   // 从指针转换为一个结构对象

    } else {
        // determine the incoming kinds with as few allocations as possible.
        t = reflect.TypeOf(in)  // 只有指针对象才能被转换
        if t.Kind() != reflect.Ptr {
            return nil, fmt.Errorf("only pointer types may be converted: %v", t)
        }
        t = t.Elem()        // 得到指针的结构对象
        if t.Kind() != reflect.Struct {
            return nil, fmt.Errorf("only pointers to struct types may be converted: %v", t)
        }
    }

    kinds, ok := s.typeToGVK[t]     // 得到存储在scheme中的GVK数据,可能有多个
    if !ok || len(kinds) == 0 {
        return nil, NewNotRegisteredErrForType(t)
    }

    gvk, ok := target.KindForGroupVersionKinds(kinds)   // 基于GroupVersioner从源GVKs得到统一的最终目标GVK,
                                                        // 譬如存储到ETCD来说,同时希望按照统一的版本来存储。
    if !ok {
        // try to see if this type is listed as unversioned (for legacy support)
        // TODO: when we move to server API versions, we should completely remove the unversioned concept
        if unversionedKind, ok := s.unversionedTypes[t]; ok {
            if gvk, ok := target.KindForGroupVersionKinds([]schema.GroupVersionKind{unversionedKind}); ok {
                return copyAndSetTargetKind(copy, in, gvk)
            }
            return copyAndSetTargetKind(copy, in, unversionedKind)
        }
        return nil, NewNotRegisteredErrForTarget(t, target)
    }

    // target wants to use the existing type, set kind and return (no conversion necessary)
    for _, kind := range kinds {    // 如果gvk 是kinds中的一种,那说明版本一致,资源对象结构一致,我们只需要把对象内容拷贝出来,并设置好目标GVK即可。
        if gvk == kind {
            return copyAndSetTargetKind(copy, in, gvk)
        }
    }

    // 如果类型是unversioned,那么没有转化你的必要
    // type is unversioned, no conversion necessary
    if unversionedKind, ok := s.unversionedTypes[t]; ok {
        if gvk, ok := target.KindForGroupVersionKinds([]schema.GroupVersionKind{unversionedKind}); ok {
            return copyAndSetTargetKind(copy, in, gvk)
        }
        return copyAndSetTargetKind(copy, in, unversionedKind)
    }

    out, err := s.New(gvk)      // 从Scheme中,按照gvk创建出一个对应的gvk的对象出来。
    if err != nil {
        return nil, err
    }

    // copy 标识是需要安全转换,安全转换会执行DeepCopy,这样就不会存在目标对象于源对象共享 Field的情况了。
    if copy {
        in = in.DeepCopyObject()
    }

    // 类型转换,基于converter成员来完成转换工作,这块需要专门分析
    flags, meta := s.generateConvertMeta(in)
    meta.Context = target
    if err := s.converter.Convert(in, out, flags, meta); err != nil {
        return nil, err
    }
    // 设置目标GVK
    setTargetKind(out, gvk)
    return out, nil
}

TODO:具体的converter如何实现不同版本对象的转换,我们分出新的章节来讨论。

前面已经分析了runtime.UnsafeObjectConvertor会生成一个ObjectConvetor,它会负责把对象转换成我们的目标GVK。现在回到NewCodec方法的内容。这里的代码也就不贴出来了,其实他就是生成了一个codec实例。下面是codec结构的定义,它实现了Codec(Serializer)接口:

type codec struct {
    encoder   runtime.Encoder
    decoder   runtime.Decoder
    convertor runtime.ObjectConvertor
    creater   runtime.ObjectCreater
    typer     runtime.ObjectTyper
    defaulter runtime.ObjectDefaulter

    encodeVersion runtime.GroupVersioner
    decodeVersion runtime.GroupVersioner
}

观察一下它的Encode方法,如下所示,它会在有必要的情况调用convertor把源对象转换成目标GVK的对象后,在调用encoder进行序列化。

// Encode ensures the provided object is output in the appropriate group and version, invoking
// conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is.
func (c *codec) Encode(obj runtime.Object, w io.Writer) error {
    switch obj.(type) {
    case *runtime.Unknown, runtime.Unstructured:
        return c.encoder.Encode(obj, w)
    }

    gvks, isUnversioned, err := c.typer.ObjectKinds(obj)
    if err != nil {
        return err
    }

    if c.encodeVersion == nil || isUnversioned {
        if e, ok := obj.(runtime.NestedObjectEncoder); ok {
            if err := e.EncodeNestedObjects(DirectEncoder{Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
                return err
            }
        }
        objectKind := obj.GetObjectKind()
        old := objectKind.GroupVersionKind()
        objectKind.SetGroupVersionKind(gvks[0])
        err = c.encoder.Encode(obj, w)
        objectKind.SetGroupVersionKind(old)
        return err
    }

    // Perform a conversion if necessary
    objectKind := obj.GetObjectKind()
    old := objectKind.GroupVersionKind()
    out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion)
    if err != nil {
        return err
    }

    if e, ok := out.(runtime.NestedObjectEncoder); ok {
        if err := e.EncodeNestedObjects(DirectEncoder{Version: c.encodeVersion, Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
            return err
        }
    }

    // Conversion is responsible for setting the proper group, version, and kind onto the outgoing object
    err = c.encoder.Encode(out, w)
    // restore the old GVK, in case conversion returned the same object
    objectKind.SetGroupVersionKind(old)
    return err
}
上一篇 下一篇

猜你喜欢

热点阅读