玩转大数据程序员大数据

DelayedOperationPurgatory--谜之炼狱

2017-04-11  本文已影响545人  扫帚的影子

DelayedOperation
if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }

分两种情况:

  1. 当前操作已经完成,则不再需要强制完成,返回false;
  2. 当前操作未完成, 则首先在Timer中取消这个定时任务, 然后回调onComplete
if (forceComplete())
      onExpiration()

里面的操作比较简单, 调用forceComplete, 如果成功,表明是真的超时了,回调onExpiration;

  1. def onExpiration(): Unit: 超时后的回调处理;
  2. def onComplete(): Unit: 操作完成后的回调处理;
  3. def tryComplete(): Boolean: 在放入到Timer前, 先尝试着执行一下这个操作, 看是否可以完成, 如果可以就不用放到Timer里了, 这是为了确保任务都尽快完成作的一个优化;
Watchers
  1. 上面说的作为超时任务放在Timer中;
  2. 与某些事件关联在一起, 可以关联多个事件, 当这些事件中的某一个发生时, 这个任务即可认为是完成;这个就是 Watchers类要完成的工作;
      var purged = 0
      operations synchronized {
        val iter = operations.iterator()
        while (iter.hasNext) {
          val curr = iter.next()
          if (curr.isCompleted) {
            iter.remove()
            purged += 1
          }
        }
      }
      if (operations.size == 0)
        removeKeyIfEmpty(key, this)

      purged
    }
     var completed = 0
      operations synchronized {
        val iter = operations.iterator()
        while (iter.hasNext) {
          val curr = iter.next()
          if (curr.isCompleted) {
            // another thread has completed this operation, just remove it
            iter.remove()
          } else if (curr synchronized curr.tryComplete()) {
            completed += 1
            iter.remove()
          }
        }
      }

      if (operations.size == 0)
        removeKeyIfEmpty(key, this)

      completed

扫描整个链表:

  1. 如果任务已完成,则移除;
  2. 如果任务未完成, 调用tryComplete尝试立即完成, 如果可以完成, 则移除;
def watch(t: T) {
      operations synchronized operations.add(t)
    }
DelayedOperationPurgatory

A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations

实际上就是用来通过TimerWatchers来管理一批延迟任务;

private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d".format(brokerId),
    false) {

    override def doWork() {
      timeoutTimer.advanceClock(200L)

      if (estimatedTotalOperations.get - delayed > purgeInterval) {
        estimatedTotalOperations.getAndSet(delayed)
        debug("Begin purging watch lists")
        val purged = allWatchers.map(_.purgeCompleted()).sum
        debug("Purged %d elements from watch lists.".format(purged))
      }
    }
  }
  1. timeoutTimer.advanceClock(200L): 驱动Timer向前走, pop出超时的延迟任务;
  2. val purged = allWatchers.map(_.purgeCompleted()).sum: 利用阈值(purgeInterval)来周期性地从Watchers中清理掉已经完成的任务;
简图:
DelayedOperation.png
基本上就是这些了

Kafka源码分析-汇总

上一篇 下一篇

猜你喜欢

热点阅读