why&how协程

2020-03-04  本文已影响0人  l1n3x

why 协程

一旦谈到网络相关的编程,总是绕不开同步、异步、阻塞以及非阻塞以及多线程等概念。近几年,有关协程的话题也越来越多。学习一个概念的第一步总是学习它的作用以及它所解决的问题。要研究协程解决的问题则先要弄清楚如果不用协程,存在哪些问题。这也是本文讨论的一个重点。

阻塞、非阻塞

要搞清楚为什么需要协程,首先需要弄清楚网络编程中的一些概念。用爬虫来举例子: 当我们爬一个网页时可以简单分为两个步骤:

  1. 向服务器发送请求
  2. 等待服务器返回数据

我们的电脑所做的事情大概时一层一层向下封装数据包。然后向网卡发送数据,继而等待网卡返回数据。这时便有了一个问题,等待网卡返回数据不需要CPU参与。也就是说,如果一直等待网卡返回数据而不做其他的事,就浪费了这一段等待的时间。例如爬虫函数如下:

def crawl(url):
    fd = send(url)  # 发送请求
    wait(fd)  # 等待数据返回

如果 需要wait获得服务器数据后才能继续执行后续的方法。则将wait法称为阻塞的。采用阻塞的方式,如果爬取1个网页需要两秒,那么爬取10个网页就需要20秒。假如我并不关心 wait 是否获取到了服务器的内容,而希望它一执行就马上返回,继续执行接下来的操作。那么这种方式的方法称之为非阻塞的。使用非阻塞方法时,程序虽然不会阻塞在方法处。因此发送上一个请求后马上可以发送下一个请求,能大量的节省等待时间。一个阻塞与非阻塞时序图如下:

阻塞非阻塞
这里的一个重点是:阻塞的等待是串行的,即上一个请求的等待完成后,才开始下一个请求的等待而非阻塞的请求等待是并行的,所有的等待几乎同时进行
使用多线程(本文所提多线程,都是指单核多线程)可以达到类似非阻塞的效果。由于多线程是在不同线程之间切换。当某个阻塞处于等待阶段时,还可以切换到其他线程去发送请求。因此可以达到非阻塞的效果。但是使用多线程又引入了线程的切换的开销,竞态条件等问题。为了避免竞态条件,又需要引入同步机制等一系列新的开销。当然,多线程的主要问题还是竞态条件的产生。就算有大量的工具和同步机制,在编写具有多线程的程序时依旧十分困难。因此,多线程仍然不是理想的解决方案。

同步与异步

上节提到阻塞与非阻塞的区别。才哟个非阻塞的方式编写代码当然更高效。但是还存在另一个问题: 如果函数不会阻塞直到获得服务器结果,那么如何知道服务器何时返回数据呢?一般来说有两种方式,一种方式是不断的查看代码的执行状况,如果完成才继续运行。例如:

def crawl(url):
    fd = send(url)  # 发送请求
    r = wait(fd)  # 假设 服务器返回数据 wait 会返回一个非空值
    while not r:  # 等待数据返回
        r = wait(fd)

这种方式称之为同步,即不断的主动询问是否完成。当然这种方式与阻塞没什么两样,等待的时间也没办法做其他的事情,因此这种方式显然是行不通的。还有另一种方式,称之为异步。即函数完成之后会进行通知,一般形式为一个回调函数:

def on_success(value):
    pass

def crawl(url):
    fd = send(url)  # 发送请求
    r = wait(fd, on_success)  # wait 等待完成会调用 on_success

这样看来,采用异步非阻塞编写代码的方式似乎解决了所有的问题。即不存在串行的等待,并且也无需多线程之间的开销以及竞态条件。但采用异步的方式编写程序实际也有许多的缺点。例如,如果多个步骤之间存在依赖关系,采用异步编写则表现为多个回调函数的嵌套,非常的不直观。如:

def action1(callback):
    callback()
def action2(callback):
    callback()
def action3(callback):
    callback
def done():
    pass
action1(lambda : action2(lambda: action3(lambda: done)))

协程

这时候就轮到协程出场了。我们先回想一下多线程的解决方案。多线程依赖于多个线程轮番执行,来达到避免串行等待的目的。但是由于多线程切换的机制,导致程序切换的位置并不确定。所以我们才需要各种机制来保证多线程中的线程安全。但是如果能有一种方法,使得不同线程之间切换时机是固定的。即当代码阻塞时切换到其他线程进行执行,阻塞完成后又自动切换回到当前线程,其他时间不进行切换。岂不是就完美的解决了多线程和异步的缺点。没错,这样的方式就是协程。利用协程编写的代码既是同步的,也可以避免串性等待的问题。协程和多线程的区别是: 多线程的切换时机是不固定的,每一行代码都可能切换。而协程的切换则是开发者自己决定的。这样开发者不需要考虑竞态条件的问题。

how协程

不同的语言对于协程的实现是不同的。在 python 里主要依赖于生成器,即包含yield关键字的函数。有关yield关键字的的基本概念可以参考生成器。先编写一个模拟异步非阻塞的类,如下:

op = DelayOP()
op.start_op("spider", 2000, callback=callback) # 定义名为2000ms的 
# 延时操作,即2000ms后会调用 callback 回调
op.loop()

然后编写一个 Future 类保存方法运行的结果。

class Future:
    def __init__(self):
        self.result = None
        self.callback = None
    
    def set_result(self, result):
        self.result = result
        self.callback and self.callback(self)
    
    def set_callback(self, callback):
        self.callback = callback

因此,此时可以利用 yield 可以这样改造:

def crawl(op):
    f = Future()
    op.start_op("download", 2000, lambda result : f.set_result(result))
    yield f

可以看出,方法运行到 yield f 时会主动退出。而由于回调函数为 lambda result : f.set_result(result)。因此在延时完成后,会执行回调,将执行结果设置到 Feture 的 result 中。但是如何驱动方法继续运行呢,此时我们还需要另一个类:

class Task:
    def __init__(self, co):
        self.co = co
        self.step()
    
    def step(self, future=None):
        try:
            next_future = self.co.send(future.result if future else None)
        except StopIteration:
            return
        next_future.set_callback(self.step)

使用时:

def show_res():
    res = yield from crawl(op)
    print(res)
Task(get_res())

这个类为什么可以驱动方法继续运行呢。首先看 Task 中的 co 代表 Task 管理的生成器,也就是 show_res() 方法。当一个 Task 实例生成时就会执行 step 方法。第一次执行 step 方法相当于 co.send(None)。而这里的 show_res 由于其中存在 yield from 语句,相当于这个 co.send(None) 实际上是:

c = crawl(op)
c.send(None)

c.send(None) 会返回 crawl 中的 f 。因此 co.send(None) 返回值即为 f。获得 f 后,f 将 step 函数添加为其完成时的回调。将在 set_result 时调用。而当延时完成后,首先 set_result 将会被调用。然后 step 作为 f 完成时回调被调用,并且参数为自身。此时将执行 co.send(f.result),驱动 crawl 继续执行。由于 crawl 已没有其他代码,因此继续执行将退出方法。会导致 crawl 抛出 StopIteration 异常。该值将会被 yield from 捕获并传递给 res。当然这里只存在一个延时操作,如果有多个延时操作则可以这样写:

def crawl(op):
    f = Future()
    op.start_op("download", 2000, lambda result: f.set_result(result))
    yield f
def save(text, op):
    f = Future()
    op.start_op(f"save {text}", 2000, lambda result: f.set_result(result))
    yield f
def show_res(op):
    text = yield from crawl(op) # 延时操作1
    text = yield from save(op, text) # 延时操作2
op = DelayOP()
Task(show_res(op))
Task(show_res(op)) # 两个延时为 4000 ms 的任务
start_time = op.ms_timestap()
op.loop()
print(f"cost {op.ms_timestap() - start_time} ms") 
# cost 4002 ms

这里进行了两个延时为 4000 ms 的任务。最终的耗时总共为 4002 ms。可以看出这两个任务的等待时并行的。也就是这里的代码用同步的代码,写出了异步非阻塞的效果。没有使用多线程,因此无需考虑竞态条件。并且编写代码的逻辑的是同步的,并不是异步回调的方式。多么的优雅呀,这就是协程的威力。
这里还有可以改进的地方,代码中的 yield 与 yield from 是混用的。这样不太好,因此可以改一下 Future 的实现:

class Future:
    # 新增
    def __iter__(self):
        if self.done:
            return self.result
        else:
            yield self
    def __await__(self):
        if self.done:
            return self.result
        else:
            yield self

然后可将 crawl 以及 down 中的 yield 改为 yield from,变得更统一。
python 还在 3.5 之后推出了 async & await 关键字。 async 表名一个方法为协程,而 await 表示则等待一个协程返回。因此只要 Future 中含有 __await__ 方法。代码还可以继续修改为:

async def crawl(op):
    f = Future()
    op.start_op("download", 2000, lambda result: f.set_result(result))
    await f
async def save(op, text):
    f = Future()
    op.start_op(f"save {text}", 2000, lambda result: f.set_result(result))
    await f
async def show_res(op):
    text = await crawl(op)
    text = await save(op, text)

这就是 python 中使用协程的最新方式。

上一篇下一篇

猜你喜欢

热点阅读