读fluentbit-oprator代码2.fluentbitc

2021-06-07  本文已影响0人  文茶君

https://github.com/kubesphere/fluentbit-operator
先看结构体

type FluentBitConfigReconciler struct {
    client.Client
    Log    logr.Logger
    Scheme *runtime.Scheme
}

执行代码

func (r *FluentBitConfigReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    ctx := context.Background()
    _ = r.Log.WithValues("fluentbitconfig", req.NamespacedName)

    var cfgs logging.FluentBitConfigList
    if err := r.List(ctx, &cfgs, client.InNamespace(req.Namespace)); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }


这常规代码,没什么好说的

for _, cfg := range cfgs.Items {
        // List all inputs matching the label selector.
        var inputs logging.InputList
        selector, err := metav1.LabelSelectorAsSelector(&cfg.Spec.InputSelector)
        if err != nil {
            return ctrl.Result{}, err
        }
        if err = r.List(ctx, &inputs, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
            return ctrl.Result{}, err
        }

var cfgs logging.FluentBitConfigList中的结构

ype FluentBitConfigList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []FluentBitConfig `json:"items"`
}

轮询列出所有与label匹配的input

selector, err := metav1.LabelSelectorAsSelector(&cfg.Spec.InputSelector)

// List all filters matching the label selector.
        var filters logging.FilterList
        selector, err = metav1.LabelSelectorAsSelector(&cfg.Spec.FilterSelector)
        if err != nil {
            return ctrl.Result{}, err
        }
        if err = r.List(ctx, &filters, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
            return ctrl.Result{}, err
        }
// List all outputs matching the label selector.
        var outputs logging.OutputList
        selector, err = metav1.LabelSelectorAsSelector(&cfg.Spec.OutputSelector)
        if err != nil {
            return ctrl.Result{}, err
        }
        if err = r.List(ctx, &outputs, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
            return ctrl.Result{}, err
        }

        // List all parsers matching the label selector.
        var parsers logging.ParserList
        selector, err = metav1.LabelSelectorAsSelector(&cfg.Spec.ParserSelector)
        if err != nil {
            return ctrl.Result{}, err
        }
        if err = r.List(ctx, &parsers, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
            return ctrl.Result{}, err
        }

这几个一样,了解fluent-bit的结构。

// Inject config data into Secret
        sl := plugins.NewSecretLoader(r.Client, cfg.Namespace, r.Log)
        mainCfg, err := cfg.RenderMainConfig(sl, inputs, filters, outputs)
        if err != nil {
            return ctrl.Result{}, err
        }
        parserCfg, err := cfg.RenderParserConfig(sl, parsers)
        if err != nil {
            return ctrl.Result{}, err
        }

        cl := plugins.NewConfigMapLoader(r.Client, cfg.Namespace)
        scripts, err := cfg.RenderLuaScript(cl, filters)
        if err != nil {
            return ctrl.Result{}, err
        }

在这里代码引用了一段lua脚本

function add_time(tag, timestamp, record)
  new_record = {}

  timeStr = os.date("!*t", timestamp["sec"])
  t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ",
        timeStr["year"], timeStr["month"], timeStr["day"],
        timeStr["hour"], timeStr["min"], timeStr["sec"],
        timestamp["nsec"])

  kubernetes = {}
  kubernetes["pod_name"] = record["_HOSTNAME"]
  kubernetes["container_name"] = record["SYSLOG_IDENTIFIER"]
  kubernetes["namespace_name"] = "kube-system"

  new_record["time"] = t
  new_record["log"] = record["MESSAGE"]
  new_record["kubernetes"] = kubernetes

  return 1, timestamp, new_record
end

func NewSecretLoader(c client.Client, ns string, l logr.Logger) SecretLoader {
    return SecretLoader{
        client:    c,
        namespace: ns,
    }
}

创造或者更新一致的秘钥

// Create or update the corresponding Secret
        sec := &corev1.Secret{
            ObjectMeta: metav1.ObjectMeta{
                Name:      cfg.Name,
                Namespace: cfg.Namespace,
            },
            Data: map[string][]byte{
                "fluent-bit.conf": []byte(mainCfg),
                "parsers.conf":    []byte(parserCfg),
            },
        }
           for k, v := range scripts {
            sec.Data[k] = []byte(v)
        }

        if err := ctrl.SetControllerReference(&cfg, sec, r.Scheme); err != nil {
            return ctrl.Result{}, err
        }

        if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, sec, func() error {
            sec.Data = map[string][]byte{
                "fluent-bit.conf": []byte(mainCfg),
                "parsers.conf":    []byte(parserCfg),
            }
            for k, v := range scripts {
                sec.Data[k] = []byte(v)
            }
            sec.SetOwnerReferences(nil)
            if err := ctrl.SetControllerReference(&cfg, sec, r.Scheme); err != nil {
                return err
            }
            return nil
        }); err != nil {
            return ctrl.Result{}, err
        }
    }

    return ctrl.Result{}, nil
}


上一篇 下一篇

猜你喜欢

热点阅读