Client-go客户端源码解析--EventRecorder

2021-08-10  本文已影响0人  队长100

EventRecorder

Kubernetes的事件是一种资源对像,用于展示集群内发生的情况,Kubernetes 中的各个组件都会将运行时的各种事件上报给Kubernetes API Server,并存储到Etcd集群中,为了避免磁盘空间被填满,对事件的保存强制执行保留策略:在最后一次事件发生后,删除1小时之前的事件。

示例代码

    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)//注册事件消费者,将事件打印日志
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})//注册事件消费者,将事件记录到Kubernetes API Server,保存到Etcd中
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "myController"})
......
//记录事件(事件生产者)
recorder.Event(obj,corev1.EventTypeWarning,"test reason","test msg")

eventRecorder机制

eventRecorder机制
  1. EventRecorder: 事件生产者,各个组件(包括用户自定义的组件)会通过EventRecorder记录事件。

  2. EventBroadcaster: 事件消费者,也称为事件广播器。消费EventRecorder记录的事件并将事件分发给注册的所有的Watcher。分发过程有两种机制----阻塞和非阻塞两种分法机制。

  3. BroadcasterWatcher: Watcher管理器,用于定义事件的具体处理方式。

EventRecorder

//接口定义
type EventRecorder interface {
    //对刚发生的事件进行记录
    Event(object runtime.Object, eventtype, reason, message string)

    // Eventf is just like Event, but with Sprintf for the message field.
  //格式化输出事件格式
    Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

    // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
  //允许自定义事件发生的事件,以记录过去发生的事件
    PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

    // AnnotatedEventf is just like eventf, but with annotations attached
  //同Eventf,附加了Annotations字段
    AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
......
......
......
//核心代码
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
   return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}

type recorderImpl struct {
   scheme *runtime.Scheme
   source v1.EventSource
   *watch.Broadcaster
   clock clock.Clock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
   ref, err := ref.GetReference(recorder.scheme, object)
   if err != nil {
      klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
      return
   }

   if !util.ValidateEventType(eventtype) {
      klog.Errorf("Unsupported event type: '%v'", eventtype)
      return
   }

   event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
   event.Source = recorder.source

   go func() {
      // NOTE: events should be a non-blocking operation
     //新建goroutine执行recorder.Action,以达到非阻塞效果(避免c.incoming channel阻塞,导致后续事件无法正常发送)
      defer utilruntime.HandleCrash()
      recorder.Action(watch.Added, event)
   }()
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
   recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}


......
//vendor/k8s.io/apimachinery/pkg/watch/mux.go
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
    m.incoming <- Event{action, obj} //incoming 默认缓冲25条消息,后续的会阻塞
}

EventBroadcaster

// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
   return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}

///vendor/k8s.io/apimachinery/pkg/watch/mux.go
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
    m := &Broadcaster{
        watchers:            map[int64]*broadcasterWatcher{},
        incoming:            make(chan Event, incomingQueueLength),//25
        watchQueueLength:    queueLength,//1000
        fullChannelBehavior: fullChannelBehavior,//channel满则丢弃消息
    }
    m.distributing.Add(1)
    go m.loop() //新建goroutine,等待消息到来
    return m
}
......
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
    // Deliberately not catching crashes here. Yes, bring down the process if there's a
    // bug in watch.Broadcaster.
    for event := range m.incoming {
        if event.Type == internalRunFunctionMarker { //添加watcher的时候会生成该类型事件,此事件不分发给watcher,只是内部处理
            event.Object.(functionFakeRuntimeObject)()//执行定义好的函数,生成并注册watcher
            continue
        }
        m.distribute(event)//分发消息到所有的watcher
    }
    m.closeAll()
    m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
    m.lock.Lock()
    defer m.lock.Unlock()
    if m.fullChannelBehavior == DropIfChannelFull { //若当前watcher的result channel缓冲已满,则丢弃后续消息
        for _, w := range m.watchers {
            select {
            case w.result <- event: //发送消息到watcher的result channel
            case <-w.stopped:
            default: // Don't block if the event can't be queued. 不阻塞,直接返回,丢弃后续的消息
            }
        }
    } else {
        for _, w := range m.watchers { //若当前watcher的result channel缓冲已满,则阻塞,直到watcher有从result内取出消息
            select {
            case w.result <- event:
            case <-w.stopped:
            }
        }
    }
}

BroacasterWatcher

func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   // The default math/rand package functions aren't thread safe, so create a
   // new Rand object for each StartRecording call.
   randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
   eventCorrelator := NewEventCorrelator(clock.RealClock{})
   return eventBroadcaster.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
      })
}
......
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
    return eventBroadcaster.StartEventWatcher(
        func(e *v1.Event) {
            logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
        })
}

......
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
    watcher := eventBroadcaster.Watch() //生成watcher并注册
    //新建goroutine启动watcher,等待事件到来并处理
    go func() {
        defer utilruntime.HandleCrash()
        for watchEvent := range watcher.ResultChan() { //从watcher的result channel中读取事件
            event, ok := watchEvent.Object.(*v1.Event)
            if !ok {
                // This is all local, so there's no reason this should
                // ever happen.
                continue
            }
            eventHandler(event)//调用定义好的事件处理函数
        }
    }()
    return watcher
}

......
//vendor/k8s.io/apimachinery/pkg/watch/mux.go
//生成并注册watcher
func (m *Broadcaster) Watch() Interface {
    var w *broadcasterWatcher
    m.blockQueue(func() {
        m.lock.Lock()
        defer m.lock.Unlock()
        id := m.nextWatcher
        m.nextWatcher++
    //生成watcher
        w = &broadcasterWatcher{
            result:  make(chan Event, m.watchQueueLength),
            stopped: make(chan struct{}),
            id:      id,
            m:       m,
        }
    //注册watcher到map中,后续分发事件的时候要用
        m.watchers[id] = w
    })
    return w
}

//生成并注册watcher的时候,阻塞incoming channel.
//这样做的目的是,确保watcher在某个事件之后被添加,看不到该事件。但是能够看到watcher被添加之后的所有事件。
func (b *Broadcaster) blockQueue(f func()) {
    var wg sync.WaitGroup
    wg.Add(1)
    b.incoming <- Event{
        Type: internalRunFunctionMarker,
        Object: functionFakeRuntimeObject(func() {
            defer wg.Done()
            f()
        }),
    }
    wg.Wait()
}
上一篇下一篇

猜你喜欢

热点阅读