源码学习程序员

Tornado源码分析手记 —— IOLoop核心实现

2016-01-29  本文已影响769人  Cyandev

最近开始阅读开源项目的源码了,鉴于一直用Tornado做项目,就从它着手开始吧。

今天分析的是Tornado的"Killer Technique",哈哈,其实没那么夸张了。我们知道Tornado采用了与Node.js相同的单线程事件驱动模型,那么它就需要一个事件轮询机制,我没有看过Node.js的源码,所以不太清楚它的机制。Tornado在IO层面主要使用了两种解决方案:

接下来我们就逐行进行分析,IOLoop类的代码不会都看,阅读的顺序是从start函数开始。

if self._running:
  raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
  self._stopped = False
  return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True

这部分其实就是进行了一些状态检测,然后设置日志,设置标志位的值。不多啰嗦了。
主要来看下面这个大的循环体:

with self._callback_lock:
  callbacks = self._callbacks
  self._callbacks = []

首先在线程同步锁下将所有的callback获取出来,然后清空原来的callback数组。

due_timeouts = []
if self._timeouts:
  now = self.time()
  while self._timeouts:
    if self._timeouts[0].callback is None:
      heapq.heappop(self._timeouts)
      self._cancellations -= 1
    elif self._timeouts[0].deadline <= now:
      due_timeouts.append(heapq.heappop(self._timeouts))
    else:
      break
if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)):
  self._cancellations = 0
  self._timeouts = [x for x in self._timeouts if x.callback is not None]
  heapq.heapify(self._timeouts)

这部分比较长,主要是处理了延时事件,支持异步的gen.sleep的实现就与这部分有关。

首先声明空数组due_timeouts来存放被trigger的事件回调,这个timeouts队列比较有意思,它用到了heapq这个包来保证数组内的元素存放顺序是一个堆的结构,这样一来每次取出来的元素都是最小的。然后只需要判断取出的最小元素是不是过时了,如果这个事件被取消了,那么直接pop掉进入下次循环;如果这个事件过时了,就直接加入待执行的due_timeouts中,进入下次循环;如果没有过时,由于这是最小元素,所以它后面的元素肯定也没有过时所以干脆直接跳出循环,接着进行下面的内容。

接下来是一个优化操作,如果取消的事件多于512个并且大于总数的一半时,就把timeouts进行清理,清理结束后再进行堆排序。

下面一部分代码就不放了,主要是执行刚才上面准备好的callbacks和timeouts。

if self._callbacks:
  poll_timeout = 0.0
elif self._timeouts:
  poll_timeout = self._timeouts[0].deadline - self.time()
  poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
  poll_timeout = _POLL_TIMEOUT

这里又进行了一次callbacks的检查,如果有需要执行的回调,那么就让poll等待的时间为0,如果有timeouts,就让poll等待的时间为还有多久触发timeout事件的时间,同时这个时间不能超过预设的最长时间。

接下来进行了一次状态检测,如果IOLoop已经停止,那么跳出循环。

try:
  event_pairs = self._impl.poll(poll_timeout)

开始等待IO事件了。

self._events.update(event_pairs)
while self._events:
  fd, events = self._events.popitem()
  try:
    fd_obj, handler_func = self._handlers[fd]
    handler_func(fd_obj, events)
  except (OSError, IOError) as e:
    if errno_from_exception(e) == errno.EPIPE:
      pass
    else:
      self.handle_callback_exception(self._handlers.get(fd))
   except Exception:
      self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None

这里得到IO事件的触发者,然后得到它的处理函数,并且执行这个函数,然后进行异常处理。

这就是IOLoop的大致思路,通过IOLoop,我们就可让一个工作变成一组可序列化并且粒度足够小的事件,依次执行。通过select/epoll机制来实现同时对多个socket进行同时处理,避免轮询浪费CPU时间,是效率高的关键因素。

上一篇下一篇

猜你喜欢

热点阅读