Kafka

Broker是怎么延时处理请求的?

2021-01-11  本文已影响0人  专职掏大粪

Timer 接口


trait Timer {
  // 将给定的定时任务插入到时间轮上,等待后续延迟执行
  def add(timerTask: TimerTask): Unit
  // 向前推进时钟,执行已达过期时间的延迟任务
  def advanceClock(timeoutMs: Long): Boolean
  // 获取时间轮上总的定时任务数
  def size: Int
  // 关闭定时器
  def shutdown(): Unit
}

SystemTimer 类


class SystemTimer(executorName: String,
                  tickMs: Long = 1,
                  wheelSize: Int = 20,
                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
  // 单线程的线程池用于异步执行定时任务
  private[this] val taskExecutor = Executors.newFixedThreadPool(1,
    (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
  // 延迟队列保存所有Bucket,即所有TimerTaskList对象
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  // 总定时任务数
  private[this] val taskCounter = new AtomicInteger(0)
  // 时间轮对象
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs,
    wheelSize = wheelSize,
    startMs = startMs,
    taskCounter = taskCounter,
    delayQueue
  )
  // 维护线程安全的读写锁
  private[this] val readWriteLock = new ReentrantReadWriteLock()
  private[this] val readLock = readWriteLock.readLock()
  private[this] val writeLock = readWriteLock.writeLock()
  ......
}

add 方法的作用,是将给定的定时任务插入到时间轮中进行管理。代码如下:


def add(timerTask: TimerTask): Unit = {
  // 获取读锁。在没有线程持有写锁的前提下,
  // 多个线程能够同时向时间轮添加定时任务
  readLock.lock()
  try {
    // 调用addTimerTaskEntry执行插入逻辑
    addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
  } finally {
    // 释放读锁
    readLock.unlock()
  }
}

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  // 视timerTaskEntry状态决定执行什么逻辑:
  // 1. 未过期未取消:添加到时间轮
  // 2. 已取消:什么都不做
  // 3. 已过期:提交到线程池,等待执行
  if (!timingWheel.add(timerTaskEntry)) {
    // 定时任务未取消,说明定时任务已过期
    // 否则timingWheel.add方法应该返回True
    if (!timerTaskEntry.cancelled)
      taskExecutor.submit(timerTaskEntry.timerTask)
  }
}

advanceClock 方法。顾名思义,它的作用是驱动时钟向前推进



def advanceClock(timeoutMs: Long): Boolean = {
  // 获取delayQueue中下一个已过期的Bucket
  var bucket = delayQueue.poll(
    timeoutMs, TimeUnit.MILLISECONDS)
  if (bucket != null) {
    // 获取写锁
    // 一旦有线程持有写锁,其他任何线程执行add或advanceClock方法时会阻塞
    writeLock.lock()
    try {
      while (bucket != null) {
        // 推动时间轮向前"滚动"到Bucket的过期时间点
        timingWheel.advanceClock(bucket.getExpiration())
        // 将该Bucket下的所有定时任务重写回到时间轮
        bucket.flush(reinsert)
        // 读取下一个Bucket对象
        bucket = delayQueue.poll()
      }
    } finally {
      // 释放写锁
      writeLock.unlock()
    }
    true
  } else {
    false
  }
}
image.png

DelayedOperation 类


abstract class DelayedOperation(override val delayMs: Long,
                                lockOpt: Option[Lock] = None)
  extends TimerTask with Logging {
  // 标识该延迟操作是否已经完成
  private val completed = new AtomicBoolean(false)
  // 防止多个线程同时检查操作是否可完成时发生锁竞争导致操作最终超时
  private val tryCompletePending = new AtomicBoolean(false)
  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
  ......
}
上一篇 下一篇

猜你喜欢

热点阅读