读fluentbit-oprator代码1.fluentbit_

2021-06-07  本文已影响0人  文茶君

https://github.com/kubesphere/fluentbit-operator

func (r *FluentBitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    ctx := context.Background()
      _ = r.Log.WithValues("fluent-bit", req.NamespacedName)

    var fb logging.FluentBit
    if err := r.Get(ctx, req.NamespacedName, &fb); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }

    if fb.IsBeingDeleted() {
        if err := r.handleFinalizer(ctx, &fb); err != nil {
            return ctrl.Result{}, fmt.Errorf("error when handling finalizer: %v", err)
        }
        return ctrl.Result{}, nil
    }

    if !fb.HasFinalizer(logging.FluentBitFinalizerName) {
        if err := r.addFinalizer(ctx, &fb); err != nil {
            return ctrl.Result{}, fmt.Errorf("error when adding finalizer: %v", err)
        }
        return ctrl.Result{}, nil
    }



_ = r.Log.WithValues("fluent-bit", req.NamespacedName)
解释:如果说次接口没有被实现,那么一方面ide会有红横线出现,另一方面在编译的时候会出现报错。两方面的提示来保证写底层代码的接口是有被实现的

 if fb.IsBeingDeleted() {
        if err := r.handleFinalizer(ctx, &fb); err != nil {
            return ctrl.Result{}, fmt.Errorf("error when handling finalizer: %v", err)
        }
        return ctrl.Result{}, nil
    }

这里handleFinalize()函数来自于

func (r *FluentBitReconciler) addFinalizer(ctx context.Context, instance *logging.FluentBit) error {
    instance.AddFinalizer(logging.FluentBitFinalizerName)
    return r.Update(ctx, instance)
}

func (r *FluentBitReconciler) handleFinalizer(ctx context.Context, instance *logging.FluentBit) error {
    if !instance.HasFinalizer(logging.FluentBitFinalizerName) {
        return nil
    }
    if err := r.delete(ctx, instance); err != nil {
        return err
    }
    instance.RemoveFinalizer(logging.FluentBitFinalizerName)
    return r.Update(ctx, instance)
}

// FluentBitFinalizerName is the name of the fluentbit finalizer
const FluentBitFinalizerName = "fluentbit.logging.kubesphere.io"

finalizer
Kubernetes 实战-Operator Finalizers 实现

Finalizers 允许 Operator 控制器实现异步的 pre-delete hook。比如你给 API 类型中的每个对象都创建了对应的外部资源,你希望在 k8s 删除对应资源时同时删除关联的外部资源,那么可以通过 Finalizers 来实现。

Finalizers 是由字符串组成的列表,当 Finalizers 字段存在时,相关资源不允许被强制删除。存在 Finalizers 字段的的资源对象接收的第一个删除请求设置 metadata.deletionTimestamp 字段的值, 但不删除具体资源,在该字段设置后, finalizer 列表中的对象只能被删除,不能做其他操作。

当 metadata.deletionTimestamp 字段非空时,controller watch 对象并执行对应 finalizers 的动作,当所有动作执行完后,需要清空 finalizers ,之后 k8s 会删除真正想要删除的资源。

介绍2:
k8s资源中ObjectMeta.DeletionTimestamp 和 DemoMicroService.ObjectMeta.Finalizers 这两个元信息。前者表示删除该资源的这一行为的具体发生时间,如果不为0则表示删除资源的指令已经下达;而后者表示在真正删除 资源之前,,还有哪些逻辑没有被执行,如果 Finalizers 不为空,那么该资源则不会真正消失。

// Check if Secret exists and requeue when not found
    var sec corev1.Secret
    if err := r.Get(ctx, client.ObjectKey{Namespace: fb.Namespace, Name: fb.Spec.FluentBitConfigName}, &sec); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{Requeue: true}, nil
        }
        return ctrl.Result{}, err
    }



ctrl "sigs.k8s.io/controller-runtime" ctrl 是该包

检查Secret存不存在,不存在 return ctrl.Result{Requeue: true}, nil

type Result struct {
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
Requeue bool
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
// Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
RequeueAfter time.Duration
}

安装RBAC验证

    // Install RBAC resources for the filter plugin kubernetes
    cr, sa, crb := operator.MakeRBACObjects(fb.Name, fb.Namespace)
    // Set ServiceAccount's owner to this fluentbit
    if err := ctrl.SetControllerReference(&fb, &sa, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }
    if err := r.Create(ctx, &cr); err != nil && !errors.IsAlreadyExists(err) {
        return ctrl.Result{}, err
    }
    if err := r.Create(ctx, &sa); err != nil && !errors.IsAlreadyExists(err) {
        return ctrl.Result{}, err
    }
    if err := r.Create(ctx, &crb); err != nil && !errors.IsAlreadyExists(err) {
        return ctrl.Result{}, err
    }

create函数是

func (c *client) Create(ctx context.Context, obj runtime.Object, opts ...CreateOption) error {
    _, ok := obj.(*unstructured.Unstructured)
    if ok {
        return c.unstructuredClient.Create(ctx, obj, opts...)
    }
    return c.typedClient.Create(ctx, obj, opts...)
}

在我们编码中,经常会碰到读取数据时,要判断数据是哪种类型,典型的是json格式文本的读取和识别。在golang中主要用 x.(T)的方式来识别类型:x是变量,而且是不确定类型的变量,interface,如果是已知类型的,比如x是string,那么就会报错:invalid type assertion: data.(string) (non-interface type string on left),当然也不能是常量,常量的类型已知,不需要做类型判断。T是类型字面量,就是类型的名称,举例来说:
var data interface{} = "hello"
strValue, ok := data.(string)
if ok {
fmt.Printf("%s is string type\n", strValue)
}
作者:古月晨风
链接:https://www.jianshu.com/p/e9da88e91ce9
来源:简书

部署守护进程fluentbit

// Deploy Fluent Bit DaemonSet
    logPath := r.getContainerLogPath(fb)
    ds := operator.MakeDaemonSet(fb, logPath)
    if err := ctrl.SetControllerReference(&fb, &ds, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }

    if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, &ds, r.mutate(&ds, fb)); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil

logPath := r.getContainerLogPath(fb)

func (r *FluentBitReconciler) getContainerLogPath(fb logging.FluentBit) string {
    if fb.Spec.ContainerLogRealPath != "" {
        return fb.Spec.ContainerLogRealPath
    } else if r.ContainerLogRealPath != "" {
        return r.ContainerLogRealPath
    } else {
        return "/var/lib/docker/containers"
    }
}

对于函数中的第一行fb.Spec.ContainerLogRealPath里的解释如下
fb对象的spec的ContainerLogRealPath,它是多个结构体对象。

// FluentBit is the Schema for the fluentbits API
type FluentBit struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   FluentBitSpec   `json:"spec,omitempty"`
    Status FluentBitStatus `json:"status,omitempty"`
}
// FluentBitSpec defines the desired state of FluentBit
type FluentBitSpec struct {
    // Fluent Bit image.
    Image string `json:"image,omitempty"`
    // Fluent Bit image pull policy.
    ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
    // Storage for position db. You will use it if tail input is enabled.
    PositionDB corev1.VolumeSource `json:"positionDB,omitempty"`
    // Container log path
    ContainerLogRealPath string `json:"containerLogRealPath,omitempty"`
    // Compute Resources required by container.
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
    // NodeSelector
    NodeSelector map[string]string `json:"nodeSelector,omitempty"`
    // Pod's scheduling constraints.
    Affinity *corev1.Affinity `json:"affinity,omitempty"`
    // Tolerations
    Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
    // Fluentbitconfig object associated with this Fluentbit
    FluentBitConfigName string `json:"fluentBitConfigName,omitempty"`
    // The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
    Secrets []string `json:"secrets,omitempty"`
}

守护进程

ds := operator.MakeDaemonSet(fb, logPath)

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kubesphere.io/fluentbit-operator/api/v1alpha2"


func MakeDaemonSet(fb v1alpha2.FluentBit, logPath string) appsv1.DaemonSet {
    ds := appsv1.DaemonSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      fb.Name,
            Namespace: fb.Namespace,
            Labels:    fb.Labels,
        },
        Spec: appsv1.DaemonSetSpec{
            Selector: &metav1.LabelSelector{
                MatchLabels: fb.Labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      fb.Name,
                    Namespace: fb.Namespace,
                    Labels:    fb.Labels,
                },
                Spec: corev1.PodSpec{
                    ServiceAccountName: fb.Name,
                    Volumes: []corev1.Volume{
                        {
                            Name: "varlibcontainers",
                            VolumeSource: corev1.VolumeSource{
                                HostPath: &corev1.HostPathVolumeSource{
                                    Path: logPath,
                                },
                            },
                        },
                        {
                            Name: "config",
                            VolumeSource: corev1.VolumeSource{
                                Secret: &corev1.SecretVolumeSource{
                                    SecretName: fb.Spec.FluentBitConfigName,
                                },
                            },
                        },
                        {
                            Name: "varlogs",
                            VolumeSource: corev1.VolumeSource{
                                HostPath: &corev1.HostPathVolumeSource{
                                    Path: "/var/log",
                                },
                            },
                        },
                        {
                            Name: "systemd",
                            VolumeSource: corev1.VolumeSource{
                                HostPath: &corev1.HostPathVolumeSource{
                                    Path: "/var/log/journal",
                                },
                            },
                        },
                    },
                    Containers: []corev1.Container{
                        {
                            Name:            "fluent-bit",
                            Image:           fb.Spec.Image,
                            ImagePullPolicy: fb.Spec.ImagePullPolicy,
                            Ports: []corev1.ContainerPort{
                                {
                                    Name:          "metrics",
                                    ContainerPort: 2020,
                                    Protocol:      "TCP",
                                },
                            },
                            Env: []corev1.EnvVar{
                                corev1.EnvVar{
                                    Name: "NODE_NAME",
                                    ValueFrom: &corev1.EnvVarSource{
                                        FieldRef: &corev1.ObjectFieldSelector{
                                            FieldPath: "spec.nodeName",
                                        },
                                    },
                                },
                            },
                            VolumeMounts: []corev1.VolumeMount{
                                {
                                    Name:      "varlibcontainers",
                                    ReadOnly:  true,
                                    MountPath: logPath,
                                },
                                {
                                    Name:      "config",
                                    ReadOnly:  true,
                                    MountPath: "/fluent-bit/config",
                                },
                                {
                                    Name:      "varlogs",
                                    ReadOnly:  true,
                                    MountPath: "/var/log/",
                                },
                                {
                                    Name:      "systemd",
                                    ReadOnly:  true,
                                    MountPath: "/var/log/journal",
                                },
                            },
                            Resources: fb.Spec.Resources,
                        },
                    },
                    NodeSelector: fb.Spec.NodeSelector,
                    Tolerations:  fb.Spec.Tolerations,
                    Affinity:     fb.Spec.Affinity,
                },
            },
        },
    }
// Mount Position DB
    if fb.Spec.PositionDB != (corev1.VolumeSource{}) {
        ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
            Name:         "positions",
            VolumeSource: fb.Spec.PositionDB,
        })
        ds.Spec.Template.Spec.Containers[0].VolumeMounts = append(ds.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
            Name:      "positions",
            MountPath: "/fluent-bit/tail",
        })
    }

    // Mount Secrets
    for _, secret := range fb.Spec.Secrets {
        ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
            Name: secret,
            VolumeSource: corev1.VolumeSource{
                Secret: &corev1.SecretVolumeSource{
                    SecretName: secret,
                },
            },
        })
        ds.Spec.Template.Spec.Containers[0].VolumeMounts = append(ds.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
            Name:      secret,
            ReadOnly:  true,
            MountPath: fmt.Sprintf("/fluent-bit/secrets/%s", secret),
        })
    }

    return ds
}
上一篇下一篇

猜你喜欢

热点阅读