Tornado应用笔记04-浅析源码

2017-06-14  本文已影响0人  simplue

索引

本节内容将分析Tornado中利用协程实现异步原理, 主要分析的是装饰器@gen.coroutine, 包括源码分析和异常捕获等问题, 另外也包括了对@asynchronous, Future等相关对象的分析.

"未来的坑" Future

在介绍两个重要的装饰器之前, 先来说说Future, 它是实现异步的一个重要对象. Future就像它的名字一样, 装载的是"未来"(未完成操作的结果), 文档描述它是"异步操作结果的占位符". 在Tornado中, 常见用法有下面两种:

# 在`IOLoop`注册`future`
tornado.ioloop.IOLoop.add_future(future, future_done_callback_func)

# `@gen.coroutine`内部结合`yield`使用
@gen.coroutine
def foo():
    result = yield future

Tornado中内置的Future(tornado.concurrent.Future)与futures包中的Future(concurrent.futures.Future)很相似, 不过Tornado的Future不是"线程安全"的, 因为Tornado本身是单线程, 所以用起来并没什么不妥, 而且速度更快

Tornado 4.0以前, Tornado的Future实际上还是引用的"线程安全"的concurrent.futures.Future, 只有在没有安装future包时才会使用"非线程安全"的Tornado 内置Future. Tornado 4.0以后的版本, 所有的Future都变成内置的, 并为其加入了exc_info方法. 这两种Futrue基本上是"兼容"的, 不过这里所谓的"兼容"只是在"调用"层面上的, 部分操作不一定会生效或执行.

Tornado 4.1后, 如果Future中的异常没有被触发(比如调用result(),exception()exc_info()), 那在Future被垃圾回收时, 会log异常信息. 如果你既想"发现"异常, 又不想让它log, 可以这么做future.add_done_callback(lambda future: future.exception())

下面介绍Future中最主要三个方法:

class Future(object):

    def result(self, timeout=None):
        # 返回future的值(future._result), 如果有执行异常, 那么将会触发异常
        self._clear_tb_log()
        if self._result is not None:
            return self._result
        if self._exc_info is not None:
            raise_exc_info(self._exc_info)
        self._check_done()
        return self._result

    def add_done_callback(self, fn):
        # 为future添加回调到回调列表中, 在`.set_result`后执行,
        # 不过如果future已经完成了, 那么会直接执行这个回调, 不放入回调列表
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        # 为future设置值, 然后执行回调列表中的所有回调, 
        # 回调传入的唯一参数就是future本身
        self._result = result
        self._set_done()

Future对于刚开始接触这个问题的开发者来说, 可能是一个不容易理解的对象, 是需要一定时间的去消化. 虽然你可能在之前已经借助gen.coroutine@asynchronous写过一些异步代码, 但是Future都是被封装到里边的, 你并不清楚其中的原理. 当你看到一些更灵活的异步应用时, 你可能就没有办法理解其中的逻辑. 所以Tornado作者建议大家都用Future练习写异步代码, 以便更好理解其所以然.

下面的例子实现了异步HTTP请求, 一个用@gen.coroutine实现, 一个用较原始的Future实现, 对比其中的不同, 或者动手改改, 但愿能帮助你理解Future.

# @gen.coroutine 实现
class AsyncFetch(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self, *args, **kwargs):
        client = tornado.httpclient.AsyncHTTPClient()
        response = yield client.fetch('http://www.baidu.com', request_timeout=2)
        self.finish(response.body)

# Future 实现
class AsyncFetch(tornado.web.RequestHandler):
    @asynchronous
    def get(self, *args, **kwargs):
        client = tornado.httpclient.AsyncHTTPClient()
        fetch_future = client.fetch('http://www.baidu.com', request_timeout=2)
        tornado.ioloop.IOLoop.current().add_future(fetch_future, callback=self.on_response)

    def on_response(self, future):
        response = future.result()
        self.finish(response .body)
异步装饰器 @asynchronous

这个装饰器适合处理回调式的异步操作, 如果你想使用协程实现异步, 那么应该单独使用@gen.coroutine. 考虑到某些历史遗留问题, 同时使用 @gen.coroutine@asynchronous也是可以的, 但是 @asynchronous 必须放在 @gen.coroutine的前面, 否则@asynchronous将被忽略.

注意, 这个装饰器能且只能用在get post一类方法上, 用在其他任意方法都是无意义的. 同时装饰器并不会"真正"使一个请求变为异步, 而仅仅是"告诉"tornado这个请求是异步的, 要使请求异步化, 则必须要在请求内完成一些异步操作, 里面的阻塞操作是会阻塞整个线程的, 不会响应新的请求, 如果你在里面sleep了, 那线程就sleep了.

另外, 用了这个装饰器以后, 请求并不会在return后结束(因为这个请求是异步的, Tornado"不知道"何时会完成, 所以会一直保持与客户端的连接), 需要显式调用 self.finish() 才会结束请求

附: Tornado 作者对 @gen.coroutine@asynchronous 一起使用的回答:

Order matters because @asynchronous looks at the Future returned by @gen.coroutine, and calls finish for you when the coroutine returns. Since Tornado 3.1, the combination of @asynchronous and @gen.coroutine has been unnecessary and discouraged; in most cases you should use @gen.coroutine alone.

@gen.coroutine@asynchronous共用需要注意顺序是因为, @asynchronous监控着@gen.coroutine 返回的 Future 然后在Future完成的时候自动调用 finish.自tornado 3.1开始, 两者就可以独立使用且并不鼓励共用, 实际上在绝大多数情况下,只需要使用 @gen.coroutine

源码注释:
@functools.wraps(method)
def wrapper(self, *args, **kwargs):

    # 关闭自动finish, 需要显式调用self.finish()
    self._auto_finish = False
    with stack_context.ExceptionStackContext(
            self._stack_context_handle_exception):

        # 执行method内的函数, 并将结果转换成future, 
        # 使用`add_future`将回调函数`future_complete`注册到`ioloop`中,
        # 回调做了两件事, 一是通过调用`future.result()`检查异常
        # 二是自动finish请求, 无需在请求内显式finish
        result = method(self, *args, **kwargs)
        if result is not None:
            result = gen.convert_yielded(result)
            def future_complete(f):
                f.result()
                if not self._finished:
                    self.finish()
            IOLoop.current().add_future(result, future_complete)
            return None
        return result
return wrapper

协程装饰器 @gen.coroutine

在理解这个装饰器前, 需要你已经了解生成器的工作方式, 比如看懂下面这段代码和执行结果. 如果你对此还不了解, 那么建议你先看看这篇文章, 然后再往下读.

>>> def echo(value=None):
...   while 1:
...     value = (yield value)
...     print("The value is", value)
...     if value:
...       value += 1
...
>>> g = echo(1)
>>> next(g)
1
>>> g.send(2)
The value is 2
3
>>> g.send(5)
The value is 5
6
>>> next(g)
The value is None

Py 3.3以前的版本, 使用了这个装饰器的生成器(含yield的函数)都不能直接使用return来返回值, 需要触发一种特殊的异常gen.Return来达到return的效果, 不过在任意版本中均可通过不带参数的return提前退出生成器.

装饰器返回的是一个Future对象, 如果调用时设置了回调函数callback, 那么callback将会在Futureset_result后调用, 若协程执行失败, callback也不会执行. 需要注意的是, callback并不需要作为被修饰函数的"可见"参数, 因为callback是被gen.coroutine处理的(具体用法见上一节线程池处理阻塞操作部分).

需要特别注意的是其中的异常处理. 执行发生异常时, 异常信息会存储在.Future 对象内. 所以必须检查.Future 对象的结果, 否则潜在的异常将被忽略. 在一个@gen.coroutine内调用另外一个@gen.coroutine, 官方文档推荐两种方式

# 在顶层使用下面的方式调用
tornado.ioloop.IOLoop.run_sync(coroutine_task_func)

# 使用`add_future`
tornado.ioloop.IOLoop.add_future(future, callback)

其实实际上只要调用了futrueresult方法, 那么异常就会被触发, 所以也可以使用下面两种方式

# 使用了`@gen.coroutine`的生成器, 靠`Runner`调用`future.result`触发异常, 下面会分析`Runner`源码
yield tornado.gen.Task(coroutine_task_func)
yield the_coroutine_task(callback=my_callback_func)
异常捕获
@tornado.gen.coroutine
def catch_exc():
    r = yield tornado.gen.sleep(0.1)
    raise KeyError


@tornado.gen.coroutine
def uncatch_exc():
    # 需要注意的是, 这里的阻塞操作, 也是会阻塞整个线程的
    time.sleep(0.1)
    raise KeyError


class CoroutineCatchExc(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        # 直接调用 `catch_exc` 也是可以触发异常的, 不过无法在这里捕获
        # 因为里面有, 在 `Runner` 中对生成器 `send` 操作的时候会触发
        # 不过如果只是想丢到`后台`执行, 这样做也是可以的, 异常都交给任务自身处理
        catch_exc()

        # 如果单独使用下面的调用是会彻底忽略掉协程执行中的异常的, 不会输出任何信息,
        uncatch_exc()

        # 下面的用法也会触发异常, 不过同样的, 并没有办法在这里捕获
        # gen.coroutine 在调用 callback 时自动传入 future.result(), 抛出异常
        uncatch_exc(callback=lambda future_result: -1)

        # 捕获并处理异常的方法

        # 方法1
        # 需要注意的是使用`ioloop`回调传入的是`future`, 不是`future.result()`
        # 所以, 在回调里面不调用`future.result()`也是白搭
        def foo(future):
            fu = future  # 这样做也是触发不了异常的
            try:
                future_result = fu.result()  # 这样才可以
            except:
                import traceback
                print 'catch exc in callback, the exc info is:'
                print traceback.format_exc()
            else:
                print 'future completed and the result is %s' % future_result
        
        fu = uncatch_exc()
        tornado.ioloop.IOLoop.current().add_future(fu, callback=foo)

        # 方法2
        # 使用 yield 后, 就成了生成器, 在`gen.coroutine`中会调用`Runner`
        # 驱动生成器, `Runner`内部有调用`future.result()`
        try:
            future_result = yield uncatch_exc('catch exc')
        except:
            import traceback
            print 'catch exc in yield, the exc info is:'
            print traceback.format_exc()
        else:
            print 'future completed and the result is %s' % future_result

        self.finish("coroutine catch exc test")

源码注释:
def coroutine(func, replace_callback=True):
    # `coroutine`的功能实际上由`_make_coroutine_wrapper`实现
    return _make_coroutine_wrapper(func, replace_callback=True)

def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # 创建一个 `future`
        future = TracebackFuture()

        # 如果调用时设置了`callback`, 则在`IOLoop`注册`future`及其回调事件
        # 因为被修饰的函数没有`callback`这个"可见"参数, 所以需要`pop`掉, 以免报错
        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        # 执行被修饰函数, 获取结果
        # 抛出的执行异常将被`set_exc_info`进`Future`内, 在执行`future.result()`时, 异常会被触发,
        # 对于`Return`和`StopIteration`, 这类特殊的异常, 将返回函数的返回值
        # 不过在`Python 3.3+`中`StopIteration`才会有`value`属性, 也就是可以直接使用`return`返回
        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future

        # 这里使用的`else`只有在`try`正常结束时执行
        # 如果被修饰的是一个`生成器`, 获取生成器生成的第一个结果, 异常处理与上面一致
        # 如果只是普通的"同步"函数(不是生成器), 那就跳过这步, 避免创建`Runner`浪费资源
        # 将第一个`yield`的结果, `生成器`(函数本身)和上面新建的`Future`一同传入`Runner`
        # `Runner`是实现协程异步的关键, 下面接着分析其中的代码
        else:
            if isinstance(result, GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                try:
                    # 生成器, 经过`Runner`, 已经`set_result`, 直接返回`future`
                    return future
                finally:
                    future = None
        # 非生成器, 没经过`Runner`, `set_result`后返回
        future.set_result(result)
        return future
    return wrapper

# `Runner`主要看`run`和`handle_yield`两个函数
class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        # 将结果转换成future, 然后判断状态, 择机进入run
        if self.handle_yield(first_yielded):
            self.run()

    # `run`实际上就是一个生成器驱动器, 与`IOLoop.add_future`配合, 利用协程实现异步
    # `run`内部虽然是个死循环, 但是因为调用了`gen.send`, 
    # 所以在`gen.send`时可以暂时离开循环, 返回到生成器中(即yield的`断点`), 使得生成器得以继续工作
    # 当生成器返回一个新的`future`时, 再次调用`handle_yield`, 
    # 若`future`完成了就进入下一次`yield`, 
    # 没完成就等到完成以后在进入到`run`进入下一次`yield`
   
    # 简化的`run`可表示成下面的样子
    # def run(self):
    #    future = self.gen.send(self.next)
    #    def callback(f):
    #        self.next = f.result()
    #        self.run()
    #    future.add_done_callback(callback)

    def run(self):
        # 各种运行状态判断, 异常处理
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    # 查异常, 有则抛出
                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()

                    if exc_info is not None:
                        yielded = self.gen.throw(*exc_info)
                        exc_info = None

                    # 正常情况, 无异常
                    else:
                        # 驱动生成器运行, 恢复到`yield`断点继续执行, 是整个函数的关键
                        yielded = self.gen.send(value)

                    if stack_context._state.contexts is not orig_stack_contexts:
                        self.gen.throw(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))

                # 生成器被掏空, 结束
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return

                # 其他异常
                except Exception:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_exc_info(sys.exc_info())
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                # 配合`handle_yield`, 使用`IOLoop`注册事件
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

    def handle_yield(self, yielded):
        # 省略部分无关代码
        # 先将传入的第一个生成器结果转换为`Future`对象
        # 如果`Future`还没有执行完毕, 或者是`moment`(一种内置的特殊`Future`, 这里可以忽视)
        # 那就等待`Future`执行完毕后执行`run`
        # 其余情况则直接执行`run`
        ...
        if ...:
            ...
        else:
            try:
                self.future = convert_yielded(yielded)
            except BadYieldError:
                self.future = TracebackFuture()
                self.future.set_exc_info(sys.exc_info())

        if not self.future.done() or self.future is moment:
            self.io_loop.add_future(
                self.future, lambda f: self.run())
            return False
        return True
特殊函数gen.Task

gen.Task的操作就是将回调式异步函数的输出转换成future类型并返回, 目的是方便被yield. 函数会自动为执行函数设置回调, 回调的工作是将操作的返回值传递给内部创建的future. 其代码可以简化为:

def Task(func, *args, **kwargs):
    future = Future()
    callback = lambda func_result: future.set_result(func_result)
    func(*args, callback=callback, **kwargs)
    return future

本节内容就是这些, 下节内容将讨论Tornado内置的异步HTTP客户端.

NEXT ===> Tornado应用笔记05-异步客户端

上一篇下一篇

猜你喜欢

热点阅读