Go语言小技巧(4) - FIFO队列

2018-10-19  本文已影响0人  陈先生_9e91

Go语言小技巧(4) - FIFO队列

队列是非常常见的数据结构,看下K8S是怎么实现的

详见client-go\tools\cache\fifo.go

struct

type Queue interface {
   Store
    
   Pop(PopProcessFunc) (interface{}, error)

   AddIfNotPresent(interface{}) error

   HasSynced() bool

   Close()
}

type Store interface {
   Add(obj interface{}) error
   Update(obj interface{}) error
   Delete(obj interface{}) error
   List() []interface{}
   ListKeys() []string
   Get(obj interface{}) (item interface{}, exists bool, err error)
   GetByKey(key string) (item interface{}, exists bool, err error)

   Replace([]interface{}, string) error
   Resync() error
}
type FIFO struct {
   lock sync.RWMutex
   cond sync.Cond
   items map[string]interface{}
   queue []string

   populated bool

   initialPopulationCount int

   keyFunc KeyFunc

   closed     bool
   closedLock sync.Mutex
}

数据结构和接口没什么好说的,都比较常见,重点关注Add和Pop。

实现

Add

func (f *FIFO) Add(obj interface{}) error {
   id, err := f.keyFunc(obj)
   if err != nil {
      return KeyError{obj, err}
   }
   f.lock.Lock()
   defer f.lock.Unlock()
   f.populated = true
   if _, exists := f.items[id]; !exists {
      f.queue = append(f.queue, id)
   }
   f.items[id] = obj
   f.cond.Broadcast()
   return nil
}
  1. 根据keyFunc计算key
  2. lock
  3. 如果key不存在,就加入queue切片
  4. items赋值
  5. goroutine通信,唤醒wait
  6. unlock

Pop

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
   f.lock.Lock()
   defer f.lock.Unlock()
   for {
      for len(f.queue) == 0 {
         if f.IsClosed() {
            return nil, FIFOClosedError
         }

         f.cond.Wait()
      }
      id := f.queue[0]
      f.queue = f.queue[1:]
      if f.initialPopulationCount > 0 {
         f.initialPopulationCount--
      }
      item, ok := f.items[id]
      if !ok {
         // Item may have been deleted subsequently.
         continue
      }
      delete(f.items, id)
      err := process(item)
      if e, ok := err.(ErrRequeue); ok {
         f.addIfNotPresent(id, item)
         err = e.Err
      }
      return item, err
   }
}

Note: 如果fifo空,则Pop会block

  1. lock
  2. 如果没有值,就goroutine通信,wait等待有值,释放锁
  3. 取queue切片第一个key
  4. 重点,切片操作f.queue = f.queue[1:],缺点显然底层数组前端一直空闲,空间换时间。
  5. initialPopulationCount操作,暂时不知道用途
  6. 取出对应值,为什么取值会不ok?遗留问题。如果没有值,就继续循环,外面是个死循环。
  7. 删除对应值
  8. unlock

Note: 第四步用了非常讨巧的方法,浪费内存,可以优化

Delete

func (f *FIFO) Delete(obj interface{}) error {
   id, err := f.keyFunc(obj)
   if err != nil {
      return KeyError{obj, err}
   }
   f.lock.Lock()
   defer f.lock.Unlock()
   f.populated = true
   delete(f.items, id)
   return err
}

解谜了。Delete只删除items,而没有删除queue切片,因为删除切片时间复杂度比较高,这个设计nice!

总结

  1. 使用切片实现顺序性
  2. pop的时候,使用切片操作[1:]
  3. 删除的时候只删除items,不要删除切片
  4. pop找不到值的时候,继续下一个
上一篇下一篇

猜你喜欢

热点阅读