读fluentbit-oprator代码1.fluentbit_
https://github.com/kubesphere/fluentbit-operator
func (r *FluentBitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
_ = r.Log.WithValues("fluent-bit", req.NamespacedName)
var fb logging.FluentBit
if err := r.Get(ctx, req.NamespacedName, &fb); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if fb.IsBeingDeleted() {
if err := r.handleFinalizer(ctx, &fb); err != nil {
return ctrl.Result{}, fmt.Errorf("error when handling finalizer: %v", err)
}
return ctrl.Result{}, nil
}
if !fb.HasFinalizer(logging.FluentBitFinalizerName) {
if err := r.addFinalizer(ctx, &fb); err != nil {
return ctrl.Result{}, fmt.Errorf("error when adding finalizer: %v", err)
}
return ctrl.Result{}, nil
}
_ = r.Log.WithValues("fluent-bit", req.NamespacedName)
解释:如果说次接口没有被实现,那么一方面ide会有红横线出现,另一方面在编译的时候会出现报错。两方面的提示来保证写底层代码的接口是有被实现的
if fb.IsBeingDeleted() {
if err := r.handleFinalizer(ctx, &fb); err != nil {
return ctrl.Result{}, fmt.Errorf("error when handling finalizer: %v", err)
}
return ctrl.Result{}, nil
}
这里handleFinalize()函数来自于
func (r *FluentBitReconciler) addFinalizer(ctx context.Context, instance *logging.FluentBit) error {
instance.AddFinalizer(logging.FluentBitFinalizerName)
return r.Update(ctx, instance)
}
func (r *FluentBitReconciler) handleFinalizer(ctx context.Context, instance *logging.FluentBit) error {
if !instance.HasFinalizer(logging.FluentBitFinalizerName) {
return nil
}
if err := r.delete(ctx, instance); err != nil {
return err
}
instance.RemoveFinalizer(logging.FluentBitFinalizerName)
return r.Update(ctx, instance)
}
// FluentBitFinalizerName is the name of the fluentbit finalizer
const FluentBitFinalizerName = "fluentbit.logging.kubesphere.io"
finalizer
Kubernetes 实战-Operator Finalizers 实现
Finalizers 允许 Operator 控制器实现异步的 pre-delete hook。比如你给 API 类型中的每个对象都创建了对应的外部资源,你希望在 k8s 删除对应资源时同时删除关联的外部资源,那么可以通过 Finalizers 来实现。
Finalizers 是由字符串组成的列表,当 Finalizers 字段存在时,相关资源不允许被强制删除。存在 Finalizers 字段的的资源对象接收的第一个删除请求设置 metadata.deletionTimestamp 字段的值, 但不删除具体资源,在该字段设置后, finalizer 列表中的对象只能被删除,不能做其他操作。
当 metadata.deletionTimestamp 字段非空时,controller watch 对象并执行对应 finalizers 的动作,当所有动作执行完后,需要清空 finalizers ,之后 k8s 会删除真正想要删除的资源。
介绍2:
k8s资源中ObjectMeta.DeletionTimestamp 和 DemoMicroService.ObjectMeta.Finalizers 这两个元信息。前者表示删除该资源的这一行为的具体发生时间,如果不为0则表示删除资源的指令已经下达;而后者表示在真正删除 资源之前,,还有哪些逻辑没有被执行,如果 Finalizers 不为空,那么该资源则不会真正消失。
// Check if Secret exists and requeue when not found
var sec corev1.Secret
if err := r.Get(ctx, client.ObjectKey{Namespace: fb.Namespace, Name: fb.Spec.FluentBitConfigName}, &sec); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
ctrl "sigs.k8s.io/controller-runtime" ctrl 是该包
检查Secret存不存在,不存在 return ctrl.Result{Requeue: true}, nil
type Result struct {
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
Requeue bool
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
// Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
RequeueAfter time.Duration
}
安装RBAC验证
// Install RBAC resources for the filter plugin kubernetes
cr, sa, crb := operator.MakeRBACObjects(fb.Name, fb.Namespace)
// Set ServiceAccount's owner to this fluentbit
if err := ctrl.SetControllerReference(&fb, &sa, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, &cr); err != nil && !errors.IsAlreadyExists(err) {
return ctrl.Result{}, err
}
if err := r.Create(ctx, &sa); err != nil && !errors.IsAlreadyExists(err) {
return ctrl.Result{}, err
}
if err := r.Create(ctx, &crb); err != nil && !errors.IsAlreadyExists(err) {
return ctrl.Result{}, err
}
create函数是
func (c *client) Create(ctx context.Context, obj runtime.Object, opts ...CreateOption) error {
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Create(ctx, obj, opts...)
}
return c.typedClient.Create(ctx, obj, opts...)
}
在我们编码中,经常会碰到读取数据时,要判断数据是哪种类型,典型的是json格式文本的读取和识别。在golang中主要用 x.(T)的方式来识别类型:x是变量,而且是不确定类型的变量,interface,如果是已知类型的,比如x是string,那么就会报错:invalid type assertion: data.(string) (non-interface type string on left),当然也不能是常量,常量的类型已知,不需要做类型判断。T是类型字面量,就是类型的名称,举例来说:
var data interface{} = "hello"
strValue, ok := data.(string)
if ok {
fmt.Printf("%s is string type\n", strValue)
}
作者:古月晨风
链接:https://www.jianshu.com/p/e9da88e91ce9
来源:简书
部署守护进程fluentbit
// Deploy Fluent Bit DaemonSet
logPath := r.getContainerLogPath(fb)
ds := operator.MakeDaemonSet(fb, logPath)
if err := ctrl.SetControllerReference(&fb, &ds, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, &ds, r.mutate(&ds, fb)); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
logPath := r.getContainerLogPath(fb)
func (r *FluentBitReconciler) getContainerLogPath(fb logging.FluentBit) string {
if fb.Spec.ContainerLogRealPath != "" {
return fb.Spec.ContainerLogRealPath
} else if r.ContainerLogRealPath != "" {
return r.ContainerLogRealPath
} else {
return "/var/lib/docker/containers"
}
}
对于函数中的第一行fb.Spec.ContainerLogRealPath里的解释如下
fb对象的spec的ContainerLogRealPath,它是多个结构体对象。
// FluentBit is the Schema for the fluentbits API
type FluentBit struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec FluentBitSpec `json:"spec,omitempty"`
Status FluentBitStatus `json:"status,omitempty"`
}
// FluentBitSpec defines the desired state of FluentBit
type FluentBitSpec struct {
// Fluent Bit image.
Image string `json:"image,omitempty"`
// Fluent Bit image pull policy.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
// Storage for position db. You will use it if tail input is enabled.
PositionDB corev1.VolumeSource `json:"positionDB,omitempty"`
// Container log path
ContainerLogRealPath string `json:"containerLogRealPath,omitempty"`
// Compute Resources required by container.
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// NodeSelector
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// Pod's scheduling constraints.
Affinity *corev1.Affinity `json:"affinity,omitempty"`
// Tolerations
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
// Fluentbitconfig object associated with this Fluentbit
FluentBitConfigName string `json:"fluentBitConfigName,omitempty"`
// The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
Secrets []string `json:"secrets,omitempty"`
}
守护进程
ds := operator.MakeDaemonSet(fb, logPath)
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kubesphere.io/fluentbit-operator/api/v1alpha2"
func MakeDaemonSet(fb v1alpha2.FluentBit, logPath string) appsv1.DaemonSet {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: fb.Name,
Namespace: fb.Namespace,
Labels: fb.Labels,
},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: fb.Labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: fb.Name,
Namespace: fb.Namespace,
Labels: fb.Labels,
},
Spec: corev1.PodSpec{
ServiceAccountName: fb.Name,
Volumes: []corev1.Volume{
{
Name: "varlibcontainers",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: logPath,
},
},
},
{
Name: "config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: fb.Spec.FluentBitConfigName,
},
},
},
{
Name: "varlogs",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/log",
},
},
},
{
Name: "systemd",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/log/journal",
},
},
},
},
Containers: []corev1.Container{
{
Name: "fluent-bit",
Image: fb.Spec.Image,
ImagePullPolicy: fb.Spec.ImagePullPolicy,
Ports: []corev1.ContainerPort{
{
Name: "metrics",
ContainerPort: 2020,
Protocol: "TCP",
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: "NODE_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "varlibcontainers",
ReadOnly: true,
MountPath: logPath,
},
{
Name: "config",
ReadOnly: true,
MountPath: "/fluent-bit/config",
},
{
Name: "varlogs",
ReadOnly: true,
MountPath: "/var/log/",
},
{
Name: "systemd",
ReadOnly: true,
MountPath: "/var/log/journal",
},
},
Resources: fb.Spec.Resources,
},
},
NodeSelector: fb.Spec.NodeSelector,
Tolerations: fb.Spec.Tolerations,
Affinity: fb.Spec.Affinity,
},
},
},
}
// Mount Position DB
if fb.Spec.PositionDB != (corev1.VolumeSource{}) {
ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "positions",
VolumeSource: fb.Spec.PositionDB,
})
ds.Spec.Template.Spec.Containers[0].VolumeMounts = append(ds.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
Name: "positions",
MountPath: "/fluent-bit/tail",
})
}
// Mount Secrets
for _, secret := range fb.Spec.Secrets {
ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
Name: secret,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret,
},
},
})
ds.Spec.Template.Spec.Containers[0].VolumeMounts = append(ds.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
Name: secret,
ReadOnly: true,
MountPath: fmt.Sprintf("/fluent-bit/secrets/%s", secret),
})
}
return ds
}