Kafka

时间轮TimingWheel

2021-01-11  本文已影响0人  专职掏大粪

TimerTask 类


trait TimerTask extends Runnable {
 val delayMs: Long // 通常是request.timeout.ms参数值,表示这个定时任务的超时时间
 // 每个TimerTask实例关联一个TimerTaskEntry
 // 就是说每个定时任务需要知道它在哪个Bucket链表下的哪个链表元素上
 private[this] var timerTaskEntry: TimerTaskEntry = null
 // 取消定时任务,原理就是将关联的timerTaskEntry置空
 def cancel(): Unit = {
   synchronized {
     if (timerTaskEntry != null) timerTaskEntry.remove()
     timerTaskEntry = null
   }
 }
 // 关联timerTaskEntry,原理是给timerTaskEntry字段赋值
 private[timer] def setTimerTaskEntry(entry: TimerTaskEntry)
   : Unit = {
   synchronized {
     if (timerTaskEntry != null && timerTaskEntry != entry)
       timerTaskEntry.remove()
     timerTaskEntry = entry
   }
 }
 // 获取关联的timerTaskEntry实例
 private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
   timerTaskEntry
 }
}

TimerTaskEntry 类

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
  @volatile
  var list: TimerTaskList = null   // 绑定的Bucket链表实例 
  var next: TimerTaskEntry = null  // next指针
  var prev: TimerTaskEntry = null  // prev指针
  // 关联给定的定时任务
  if (timerTask != null) timerTask.setTimerTaskEntry(this)
  // 关联定时任务是否已经被取消了
  def cancelled: Boolean = {
    timerTask.getTimerTaskEntry != this
  }
  // 从Bucket链表中移除自己
  def remove(): Unit = {
    var currentList = list
    while (currentList != null) {
      currentList.remove(this)
      currentList = list
    }
  }
  ......
}

TimerTaskList 类

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  private[this] val root = new TimerTaskEntry(null, -1)
  root.next = root
  root.prev = root
  private[this] val expiration = new AtomicLong(-1L)
  ......
}

expiration 的 Getter 和 Setter 方法。

 // Set the bucket's expiration time
  // Returns true if the expiration time is changed
  def setExpiration(expirationMs: Long): Boolean = {
    expiration.getAndSet(expirationMs) != expirationMs
  }

  // Get the bucket's expiration time
  def getExpiration(): Long = {
    expiration.get()
  }

add方法

  // Add a timer task entry to this list
  def add(timerTaskEntry: TimerTaskEntry): Unit = {
    var done = false
    while (!done) {
 // 在添加之前尝试移除该定时任务,保证该任务没有在其他链表中
      // Remove the timer task entry if it is already in any other list
      // We do this outside of the sync block below to avoid deadlocking.
      // We may retry until timerTaskEntry.list becomes null.
      timerTaskEntry.remove()

      synchronized {
        timerTaskEntry.synchronized {
          if (timerTaskEntry.list == null) {
            // put the timer task entry to the end of the list. (root.prev points to the tail entry)
            val tail = root.prev
            timerTaskEntry.next = root
            timerTaskEntry.prev = tail
            timerTaskEntry.list = this
 // 把timerTaskEntry添加到链表末尾
            tail.next = timerTaskEntry
            root.prev = timerTaskEntry
            taskCounter.incrementAndGet()
            done = true
          }
        }
      }
    }
  }

remove方法

 // Remove the specified timer task entry from this list
  def remove(timerTaskEntry: TimerTaskEntry): Unit = {
    synchronized {
      timerTaskEntry.synchronized {
        if (timerTaskEntry.list eq this) {
          timerTaskEntry.next.prev = timerTaskEntry.prev
          timerTaskEntry.prev.next = timerTaskEntry.next
          timerTaskEntry.next = null
          timerTaskEntry.prev = null
          timerTaskEntry.list = null
          taskCounter.decrementAndGet()
        }
      }
    }
  }

flush 方法: 将高层次时间轮 Bucket 上的定时任务重新插入回低层次的 Bucket 中

  // Remove all task entries and apply the supplied function to each of them
  def flush(f: (TimerTaskEntry)=>Unit): Unit = {
    synchronized {
      var head = root.next
      while (head ne root) {
        remove(head)
        f(head)
        head = root.next
      }
      expiration.set(-1L)
    }
  }

TimingWheel 类

#定义
private[timer] class TimingWheel(
  tickMs: Long, wheelSize: Int, 
  startMs: Long, taskCounter: AtomicInteger, 
  queue: DelayQueue[TimerTaskList]) {
  private[this] val interval = tickMs * wheelSize
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  private[this] var currentTime = startMs - (startMs % tickMs)
  @volatile private[this] var overflowWheel: TimingWheel = null
  ......
}

addOverflowWheel


private[this] def addOverflowWheel(): Unit = {
  synchronized {
    // 只有之前没有创建上层时间轮方法才会继续
    if (overflowWheel == null) {
      // 创建新的TimingWheel实例
      // 滴答时长tickMs等于下层时间轮总时长
      // 每层的轮子数都是相同的
      overflowWheel = new TimingWheel(
        tickMs = interval,
        wheelSize = wheelSize,
        startMs = currentTime,
        taskCounter = taskCounter,
        queue
      )
    }
  }
}

add


def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  // 获取定时任务的过期时间戳
  val expiration = timerTaskEntry.expirationMs
  // 如果该任务已然被取消了,则无需添加,直接返回
  if (timerTaskEntry.cancelled) {
    false
  // 如果该任务超时时间已过期
  } else if (expiration < currentTime + tickMs) {
    false
  // 如果该任务超时时间在本层时间轮覆盖时间范围内
  } else if (expiration < currentTime + interval) {
    val virtualId = expiration / tickMs
    // 计算要被放入到哪个Bucket中
    val bucket = buckets((virtualId % wheelSize.toLong).toInt)
    // 添加到Bucket中
    bucket.add(timerTaskEntry)
    // 设置Bucket过期时间
    // 如果该时间变更过,说明Bucket是新建或被重用,将其加回到DelayQueue
    if (bucket.setExpiration(virtualId * tickMs)) {
      queue.offer(bucket)
    }
    true
  // 本层时间轮无法容纳该任务,交由上层时间轮处理
  } else {
    // 按需创建上层时间轮
    if (overflowWheel == null) addOverflowWheel()
    // 加入到上层时间轮中
    overflowWheel.add(timerTaskEntry)
  }
}

image.png

advanceClock: 向前驱动时钟的方法


def advanceClock(timeMs: Long): Unit = {
  // 向前驱动到的时点要超过Bucket的时间范围,才是有意义的推进,否则什么都不做
  // 更新当前时间currentTime到下一个Bucket的起始时点
  if (timeMs >= currentTime + tickMs) {
    currentTime = timeMs - (timeMs % tickMs)
    // 同时尝试为上一层时间轮做向前推进动作
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
  }
}
上一篇 下一篇

猜你喜欢

热点阅读