用python编写http server(3)-实现协程的服务器

2020-04-19  本文已影响0人  shallows2013

一般的异步服务器基本都是NIO模式,将回调函数和fd注册在select/poll/epoll上面,当io事件发生时,执行回调函数,因此需要个loop来不断循环阻塞-执行。
查看asyncio的代码,可以看出基本符合NIO模式,但python通过生成器,避免了回调地狱,使得开发者可以用一种几乎同步的思维来写异步代码。
asyncio基本的class有 io_loop、future、task等。下面参考asyncio和tornado,自己编写个协程机制

首先是io_loop,io_loop应该不断的检查selector,当有io事件时执行回调函数

class IOLoop():
    def __init__(self):
        self.selector = selectors.DefaultSelector()
        self.alive = True
        self.pipe = os.pipe()

    def add_handler(self, fileobj, events, data):
        self.selector.register(fileobj, events, data)

    def remove_handler(self, fileobj):
        self.selector.unregister(fileobj)

    def run_forever(self):
        self.add_handler(self.pipe[0], selectors.EVENT_READ, self._wake_up)
        while self.alive:
            events = self.selector.select()
            for key, mask in events:
                if key.data:
                    key.data()

    def _wake_up(self):
        os.read(self.pipe[0], 1)

    def stop(self):
        # 简单化的退出逻辑
        self.alive = False
        os.write(self.pipe[1], b'.')

原版的io_loop远比这个复杂,但我们只是实现最基本的功能

然后是future,定义一个对象,代表以后会完成的结果。

class Future():

    def __init__(self):
        self._result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._schedule_callbacks()

    def _schedule_callbacks(self):
        callbacks = self._callbacks[:]
        if not callbacks:
            return
        self._callbacks[:] = []
        for callback in callbacks:
            callback()

    def __iter__(self):
        yield self
        return self._result

最后是一个task,用来管理协程,不断触发协程执行下一步。

class Task:
    def __init__(self, coro):
        self.coro = coro
        self._step()

    def _step(self):
        try:
            future = self.coro.send(None)
        except StopIteration:
            return
        future.add_done_callback(self._step)

我们编写这样一个worker,其中关键函数是一个生成器

    def accept(self):
        f = Future()

        def on_accept():
            try:
                self.client, addr = self.sock.accept()
                print("connetc from ", addr)
            except BlockingIOError:
                return
            self.io_loop.remove_handler(self.sock)
            f.set_result(None)
        self.io_loop.add_handler(self.sock, selectors.EVENT_READ, on_accept)
        yield from f

用task来管理生成器:

    Task(worker.run())
    io_loop = get_event_loop()
    io_loop.run_forever()

task初始化协程,协程在需要操作io时,向io_loop注册事件和回调函数,返回future,future的callback设置为_step(当future完成时就会执行_step,使协程继续运行)。协程暂停执行,系统转而执行其他的代码,等到io事件触发时,loop执行之前的回调函数,即将future设置result。此时系统会执行future的回调函数,即task的_step,使协程继续运行。自此一个闭环形成了。
完整的代码在git:https://github.com/shallows2014/learning/blob/master/python/http_server/async_server/server.py

代码只是实现了简单的协程机制,实际会有很多问题,比如说不能创建多个task,因为task用的都是同一个sock,selector不能为同一个fd注册多次,解决方法很多,比如说,不使用协程而只是使用回调函数来处理accept,当遇到连接时创建新的对象,对象再通过协程来处理请求。

上一篇下一篇

猜你喜欢

热点阅读