[k8s源码分析][client-go] informer之co
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
在 [k8s源码分析][client-go] informer之store和index 和 [k8s源码分析][client-go] informer之store和index 和 [k8s源码分析][client-go] informer之reflector 的基础上进行分析, 接下来将会分析如何生成一个
informer
, 并且用户如何添加自己的逻辑, 与用户层越来越接近了.
2. 接口与类
这里先介绍后面需要用到的几个接口与结构体
architecture.png
2.1 ResourceEventHandler
// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
相信对这三个函数比较熟悉, 用户可以在这里定义自己的逻辑.
2.2 processorListener
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
// 一个自定义处理数据的handler
handler ResourceEventHandler
// 一个环形的buffer 存着那些还没有被分发的notifications
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// informer's overall resync check period.
resyncPeriod time.Duration
// 下次要resync的时候
nextResync time.Time
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
func (p *processorListener) determineNextResync(now time.Time) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
// now加上该listener的resyncPeriod就是下次要resync的时间
p.nextResync = now.Add(p.resyncPeriod)
}
关于
buffer.NewRingGrowing
是一个无限的循环数组, 无限的意思是当你想要在增加一个元素, 发现整个数组满了, 此时会进行扩容, 如果一直扩容, 会被OOM
杀死.
关于
shouldResync
和setResyncPeriod
比较简单就不多说了. 这里说一下三个比较重要的方法add
,pop
和run
方法.
add
add
方法是由上层程序调用的, 也就是往该listener
发送了一个新的notification
. 相当于生产者.
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
pop 和 run
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
// notification从缓冲区pendingNotifications中读 然后传递给nextCh
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
// 如果notification还没有初始化 则进行初始化notification和nextCh
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
// 直接往pendingNotifications中写
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
// 从nextCh读取并调用该listener的handler进行处理
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
pop
和run
属于消费者, 消费从add
方法中过来的notification
, 但是为了防止处理速度(调用handler
)跟不上生产速度, 设置了一个缓冲区pendingNotifications
, 把从add
中过来的notification
先加入到pendingNotifications
, 然后从pendingNotifications
读取一个notification
后, 将notification
通过nextCh
这个channel
来进而传递给消费者run
.
work_flow.png
2.3 sharedProcessor
type sharedProcessor struct {
// 判断listeners有没有启动
listenersStarted bool
listenersLock sync.RWMutex
// 所有的processorListener
listeners []*processorListener
// 所有的需要sync的processorListener 动态变化
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
这里
sharedProcessor
就是管理着所有的processorListener
, 简单一点理解就是当拿到一个数据, 然后可以分发给所有的listeners
.
resyncCheckPeriodChanged
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// 根据listener自己要求的requestedResyncPeriod和resyncCheckPeriod来决定该listener真正的resyncPeriod
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
// 1. 如果desired或check其中一个是0 则返回0
// 2. 返回max(desired, check)
func determineResyncPeriod(desired, check time.Duration) time.Duration {
if desired == 0 {
return desired
}
if check == 0 {
klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
return 0
}
if desired < check {
klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
return check
}
return desired
}
resyncCheckPeriodChanged
的作用是根据resyncCheckPeriod
会重新生成一下每个listener
自己的resyncPeriod
.
对于每一个listener
:
1. 如果自己要求的requestedResyncPeriod
为0
或被要求的resyncCheckPeriod
其中一个是0
, 则返回0
.
2. 则返回两个其中最大的一个.
shouldResync
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
可以看到该方法会重新生成
syncingListeners
, 遍历所有的listeners
, 判断哪个已经到了resync
时间, 如果到了就加入到syncingListeners
中, 并且它的下一次resync
的时间.
如果所有的
listeners
都没有到resync
时间, 那该sharedProcessor
对象的shouldResync
会返回false
. 否则会返回true
.
run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 以goroutine的方式启动所有的listeners监听
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 等待有信号告知退出
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 关闭所有listener的addCh channel
for _, listener := range p.listeners {
// 通知pop()停止 pop()会告诉run()停止
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
// 等待所有的pop()和run()方法退出
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
run
方法主要是启动所有的listener
进行监听.
其余方法
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
// 如果已经启动了
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
// 分发信息
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
// 如果是sync操作 只需要分发给那些resync时间到了的listener即可
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
// 如果不是sync操作 则通知所有的listeners
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
addListener: 表示增加一个
processorListener
, 如果sharedProcessor
已经启动run
方法了(listenersStarted=true
), 那么就启动该listener
的run
和pop
监控.
distribute: 分发消息, 也就是说
sharedProcessor
收到一个obj
, 然后把该obj
分发给它的listeners
, 那么每个listeners
都可以收到这个obj
.
informer整体
整个
informer
体系在k8s
代码中占有重要一环, 理解informer
可以更好理解k8s
的工作机制.
informer.png
1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory