[k8s源码分析][client-go] informer之de
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
本文将分析
tools/cache
包中的DeltaFIFO
. 主要会涉及到delta_fifo.go
, 该类在整个informer
体系中用于接受reflector
送出来的数据. 相当于reflector
是DeltaFIFO
的生产者.
2. 整体接口与实现类
architecture.png
DeltaFIFO is like FIFO, but allows you to process changes to items which is delta.
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer,
and the consumer is whatever calls the Pop() method.
A note on the KeyLister used by the DeltaFIFO:It's main purpose is to list keys that are "known",
for the purpose of figuring out which items have been deleted when Replace() or Delete() are
called. The deleted object will be included in the DeleteFinalStateUnknown markers.
关于
FIFO
, 在 [k8s源码分析][client-go] cache之fifo 中已经分析了,DeltaFIFO
也是Queue
的一个实现类, 但是稍微比FIFO
复杂一点.
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
// items里面存的是key 以及该key对应的pod的变化
// queue中存的是key 即出队列的顺序
items map[string]Deltas
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// 生成key
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
// 说白了 就是本地缓存
knownObjects KeyListerGetter
closed bool
closedLock sync.Mutex
}
// It tells you what change happened
type Delta struct {
Type DeltaType
Object interface{}
}
type Deltas []Delta
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister
KeyGetter
}
// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
ListKeys() []string
}
// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}
与
FIFO
相比, 主要有以下几点不同:
1.items
中的value
不再只存着该key
对应的obj
, 而是obj
的一系列变化, 用一个数组来表示. 包括添加/更新/删除等等. 因此衍生出来了很多结构体和方法, 包括Deltas
,Delta
等等.
2. 增加了本地缓存knownObjects KeyListerGetter
,KeyListerGetter
提供了两个方法分别是从本地缓存中获得所有的key
和根据key
找到对应的obj
. 当程序中错过了某些event
, 比如deletion event
, 会造成服务器数据库中没有该obj
, 而本地缓存中有该obj
, 从而造成数据不一致, 那么在同步的过程中会有所操作. (其实KeyListerGetter
在informers
体系中是一个Indexer
. [k8s源码分析][client-go] cache之store和index)
或许有人会疑惑会为什么需要用另外一个属性来缓存呢?items
属性不就可以当做缓存了吗? 理由是: items只是暂时性存储, 当调用pop
的时候对应的数据就会从items
中删除了, 而knownObjects
会维护本地缓存.
3.DeletedFinalStateUnknown
: 当一个obj
被删除了, 但是这个程序这边由于某种原因miss
了这次deletion event
, 那么假如在做同步操作时, 从服务器获取的列表中已经没有了这个obj
, 因为该程序没有接收到deletion event
, 所以该obj
在本地缓存中依然存在, 所以此时会给这个obj
构造成这个DeletedFinalStateUnknown
类型.
3. 方法
在讲方法的同时尽量用个例子来进行说明. 先定义一下类和方法.
func testFifoObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testFifoObject).name, nil
}
type testFifoObject struct {
name string
val interface{}
}
func mkFifoObj(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}
// helper function to reduce stuttering
func testPop(f *DeltaFIFO) testFifoObject {
return Pop(f).(Deltas).Newest().Object.(testFifoObject)
}
// keyLookupFunc adapts a raw function to be a KeyLookup.
type keyLookupFunc func() []testFifoObject
// ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string {
result := []string{}
for _, fifoObj := range kl() {
result = append(result, fifoObj.name)
}
return result
}
// GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
for _, v := range kl() {
if v.name == key {
return v, true, nil
}
}
return nil, false, nil
}
Add 和 Update
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil {
// 如果没有设置本地缓存
if _, exists := f.items[id]; !exists {
// 如果items中没有该元素, 返回
return nil
}
} else {
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// 如果本地缓存和items中都没有, 返回
return nil
}
}
return f.queueActionLocked(Deleted, obj)
}
1.
Delete
方法有所不一样, 需要判断本地缓存. 这三个方法都是需要调用queueActionLocked
来进行操作.
2. 都设置populated
为true
, 跟在FIFO
中 [k8s源码分析][client-go] cache之fifo 的行为一样.
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
// 如果是Deltas, 也就是该obj的变化, 取最后一个操作的obj
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
// 如果该是DeletedFinalStateUnknown类型, 表明在服务器端已经被删除了, 在本地缓存中依然存在
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
// 根据obj生成key
return f.keyFunc(obj)
}
// 目前这里的操作只是去判断最后两个元素是不是都是delete, 如果是则进行合并, 就选其中一个即可
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
// TODO: Detect other duplicate situations? Are there any?
return nil
}
// a:倒数第一个 b:倒数第二个
// 如果倒数第一个和倒数第二个都是Delete
// 如果倒数第二个是DeletedFinalStateUnknown 返回倒数第一个
// 如果倒数第二个不是DeletedFinalStateUnknown 返回倒数第二个
// 选择一个尽量不是DeletedFinalStateUnknown的元素
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// Do more sophisticated checks, or is this sufficient?
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
// 判断该id的最后一次操作是不是Deleted操作
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
deltas := f.items[id]
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 如果是Sync并且该元素中最后一次变化是删除操作 就直接返回了
// 因为都已经是删除操作了, 在后面加一个Sync就没有必要了 也可以方便用户操作, 用户判断最后一个是不是delete会很方便
// Resync和Replace方法中有可能会调用Sync操作
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}
1. 首先利用
KeyOf
方法计算出该obj
的id
. (在KeyOf
中如果该obj
是一个Deltas
类型, 则取最后一次变化的元素的id
).
2. 如果是Sync
操作(Resync
和Replace
方法中有可能会调用Sync
操作), 如果该元素目前接受到的最后一次是删除操作, 则这里直接返回.(因为服务器端已经发出删除指令了, 这里没必要再给加Sync
)
3. 判断该元素的最后两次操作是否相同(已经把这次要加入的操作也算进去了), 这里主要是进行Delete
操作判断, 如果相同需要进行合并.
例子
因为在
informers
体系下knownObjects
是真实存在的, 所以为了后面更好的理解informers
, 所以例子中会带有knownObjects
.
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
)
f.Update(mkFifoObj("baz", 18))
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("bar", 15))
f.Update(mkFifoObj("foo", 15))
f.Delete(mkFifoObj("baz", 18))
1. 可以看到本地缓存中已经保存了
foo
,bar
和baz
, 说明之前已经有Add
这些event
发生了, 所以本地缓存中有.(因为这是模拟, 所以就这样说明一下)
2. 调用f.Add(mkFifoObj("foo", 10))
, 按照上面的方法, 这个调用肯定是可以成功的.
3.Update
操作与上面一样的, 不多说了. 可以看f.Delete(mkFifoObj("baz", 20))
, 并且本地缓存中有baz
, 所以成功. 那什么时候会从本地缓存中删除这个baz
, 这个在informers
体系中是在pop
的时候去更新本地缓存的.pop
出来的是一个Dletas
(包含一系列该obj
变化的delta
数组), 那么我们自己定义的逻辑要怎么处理就怎么处理, 在informers
的逻辑中, 是for
这个数组对本地缓存进行操作. 所以在没有出队列之前, 本地缓存中的值还是原来的值.
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// clientState就是本地缓存 对应的knownObjects
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest 出队列的数组 一个一个操作
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
// 更新本地缓存
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
// 添加到本地缓存
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
// 删除本地缓存
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
最终的结果如下:
add/update/delete.png
pop方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
1. 这里的出队列与
FIFO
一样, 但是有一点区别是这里出队列的是一个数组, 而不是某一个obj
. 从上面的例子中看, 第一个出队列的是foo
, 它出来的是一个数组[{Add, {foo, 10}}, {Update, {foo, 15}}]
, 然后用户的process
方法需要处理的是这样的一个数组.
2. 如果initialPopulationCount > 0
, 表明Replace
是比Add/Update/AddIfNotPresent/Delete
先调用 然后设置了initialPopulationCount
就是第一次调用Replace
中加入的元素个数, 那在pop
中对于initialPopulationCount--
做的操作就是每出来一个元素就减少一个, 等到initialPopulationCount=0
的时候, 也就表明第一次调用replace
加入的元素已经全部出队列了.
例子
所以按照上面的例子出一个队列看一下, 然后
process PopProcessFunc
就用上面informers
用的那个process
的逻辑. 所以出队列的是baz
, 然后依次对它的两个变化{Update, {baz, 18}}
和{Delete, {baz, 20}}
进行操作.
delete.png
delta_update.png
delta_delete.png
Replace
Replace
的作用很清楚, 就是用传入的list
来代替之前这里的所有元素, 与FIFO
不同的是, 这里的操作都是针对变化, 这里DeltaFIFO
就给那些要删除的元素发送一个Delete
操作, 给那些不需要删除的元素发送一个Sync
操作表示已经完成同步.
那么问题来了, 哪些元素是要被删除的元素呢? 在传入的
list
中没有出现的元素就是要删除的元素. 再想一下,DeletedFinalStateUnknown
出现的原因是因为某种原因miss
了Delete event
, 现在假设Replace
中的元素是从服务器中最新获取的所有真正存在的元素, 并且该程序没有错误任何的删除事件, 那么传入的list
的keys
与knownObjects
中的keys
应该是一样的. 所以如果错过了某些删除事件, 那自然是knownObjects
中多了一些已经被删除的obj
.
理解了这些, 再看代码就会轻松很多了.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 将要加入的list放到keys中
// 给list中的每一个item发送Sync操作
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
if f.knownObjects == nil {
// Do deletion detection against our own list.
// 如果没有设置本地缓存
queuedDeletions := 0
for k, oldItem := range f.items {
// 如果新加的list中有 因为已经发送Sync操作了 所以就不需要了
if keys.Has(k) {
continue
}
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
// 不在list中的元素需要被删除
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// Detect deletions not already in the queue.
// 这里可能有人会疑惑为什么不删除f.items里面的元素, 因为f.items里面有的元素会出现在本地缓存中的, 所以直接对本地缓存做操作即可
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
1. 可以看到在做删除操作的时候, 给这些
obj
都是构造成DeletedFinalStateUnknown
类型.
2.initialPopulationCount
将会被设置成len(list) + queuedDeletions
, 也就是说要等待同步的元素和那些错过了删除事件的元素全部出队列完成才可以说同步成功.
例子
在上面的基础上增加个
Replace
操作.
1.pop() --> 本地缓存中就会有foo的记录, queue和item中会删除foo
2.pop() --> 本地缓存中就会有bar的记录, queue和item中会删除bar
3.Add {baz, 15} pop() --> 本地缓存中就会有baz的记录
4.Update {foo, 10} Update {foo, 15} --> foo会加入到queue和item中
5.Update {bar, 15} --> bar会加入到queue和item中
6.Update {baz, 18} Delete {baz, 18} --> baz会加入到queue和item中
7.Replace [{foo, 15}, {baz, 18}]
前面
6
步会得到下面的这个状态, 这里就不一一解释了, 上面已经分析过了.
before_replace.png
可以看到在本地缓存中有
foo
,bar
和baz
. 现在假设Replace
中的list
是从服务器上最新获取来的元素列表. 那么可以看到bar
已经被删除了, 而foo
和baz
还在服务器中.
另外要说明的一点是baz
已经接收到了delete
操作, 怎么服务器上还有呢?这里有可能是获得的列表是在发出了删除命令与完全删除之间这段时间获取的, 也有可能是删除了之后又添加了一个这样的obj
, 但是DeltaQueue
还没有收到Add event
, 不过有待考证, 这里只是想为了说明在Delete
之后发送了Sync
操作是不添加在后面的.
replace.png
从图中可以看到:
foo: 在list
中, 所以发送一个sync
请求.
bar: 不在list
中, 说明服务器端已经删除了, 该DeltaQueue
由于某种原因(比如网络)没有收到delete event
, 也有可能还没有出队列等等原因, 所以需要加一个Delete
操作, 并且构造成DeletedFinalStateUnknown
类型. 另外在这里仔细思考一下之前关于isDeletionDup
的操作是不是更清晰, 就是尽量选从上流程序发过来的Delete event
带有的obj
.
Delete: 因为baz
在list
中, 所以发一个sync
操作, 但是由于该元素之前已经接收到了Delete
请求, 所以再把该Sync
加到后面, 在出队列pop
中informers
(上面有提到), 判断是Sync
并且本地缓存中没有的时候会把该obj
重新加回去.
Resync
func (f *DeltaFIFO) syncKey(key string) error {
f.lock.Lock()
defer f.lock.Unlock()
return f.syncKeyLocked(key)
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
// 如果该元素在本地缓存中不存在 则返回
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
向本地缓存中的那些不在
items
里面的元素发一个Sync
操作.
HasSynced
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}
假设此时该
DeltaFIFQ
对象刚刚初始化.
1. 如果啥方法都没有调用, 那么HasSynced
返回false
, 因为populated=false
.
2. 如果先调用Add/Update/AddIfNotPresent/Delete
后(后面调用什么函数都不用管了), 那么HasSynced
返回true
, 因为populated=true并且initialPopulationCount == 0
.
3. 如果先调用Replace
(后面调用什么函数都不用管了), 那么必须要等待该replace
方法加入元素的个数和DeletedFinalStateUnknown
(也就是那些本地缓存上有服务器上没有的元素)全部pop
之后,HasSynced
才会返回true
, 因为只有全部pop
完了之后initialPopulationCount
才减为0
.
informer整体
整个
informer
体系在k8s
代码中占有重要一环, 理解informer
可以更好理解k8s
的工作机制.
informer.png
1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory