线程 进程 协程 工作生活

Python协程

2019-06-30  本文已影响0人  Recalcitrant

目录:
一、基于生成器的协程
二、协程状态
三、协程预激装饰器
四、终止协程和异常处理
五、协程返回值
六、yield from
七、greenlet协程
八、gevent协程

Python并发之协程

协程(coroutine)可以在执行期间暂停(suspend),这样就可以在等待外部数据处理完成之后(例如,等待网络I/O数据),从之前暂停的地方恢复执行(resume)

一、基于生成器的协程

yield item:yield包含”产出“和”让步“两个含义。生成器中 yield x 这行代码会 产出 一个值,提供给 next(...) 的调用方。此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用 next(...)。

send(item):生成器的调用方可以使用send()方法发送数据,发送的数据会成为生成器中yield表达式的值。此时称生成器为协程。协程指与调用方协作的过程,产出由调用方提供的值。

示例:

def simple_coroutine():
    print("->协程开始")
    x = yield
    print("->协程收到x:", x)


my_coro = simple_coroutine()
print(my_coro)
print(next(my_coro))
my_coro.send(50)
运行结果

运行结果解释:

① yield在表达式中的使用:如果协程只需从客户那里接收数据,那么产出的值是None(这个值是隐式指定的,因为yield关键字右边没有表达式)。
② 首先要需要调用next(...)。因为生成器还没启动,未在yield语句处暂停,所以一开始无法发送数据。
③ 调用send()方法后,协程定义体中的yield表达式会计算出50。调用send()方法后协程会恢复,一直运行到下一个yield表达式,或者终止。

二、协程状态

GEN_CREATED:创建状态(等待开始执行)
GEN_SUSPENDED:挂起状态(在yield表达式处暂停)
GEN_RUNNING:运行状态(正在被解释器执行。只有在多线程应用中才能看到这个状态)
GEN_CLOSED:关闭状态(执行结束)

协程激活方法

  • 1.调用next():next(generator)
  • 2.发送None:generator.send(None)

示例:

In[1]:  from inspect import getgeneratorstate
        def simple_coroutine():
            print("->协程开始")
            x = yield
            print(getgeneratorstate(my_coro2))
            print("->协程收到x:", x)

        my_coro = simple_coroutine()
        next(my_coro)
        print(getgeneratorstate(my_coro))
        my_coro.send(50)
In[2]:  print(getgeneratorstate(my_coro))

三、协程预激装饰器

1.@wraps装饰器

Python装饰器(decorator)在实现的时候,被装饰后的函数其实已经是另外一个函数了(函数名等函数属性会发生改变)。为了消除这样的影响,Python的functools包中提供了一个叫wraps的decorator来消除这样的副作用。写一个decorator的时候,最好在实现之前加上functools的wrap,它能保留原有函数的名称和docstring。

示例:

from functools import wraps
def my_decorater(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        wrapper.doc
        :param args:
        :param kwargs:
        :return:
        """
        print("calling decorater start")
        func(*args, **kwargs)
        print("calling decorater end")

    return wrapper


@my_decorater
def exam():
    """
    exam.doc
    :return:
    """
    print("example....")


exam()
print("func_name:", exam.__name__)
print("func_doc:", exam.__doc__)
运行结果

2.协程预激装饰器的实现

from functools import wraps
def coroutine_activator(function):
    @wraps(function)
    def wrapper(*args, **kwargs):
        generator = function(*args, **kwargs)
        next(generator)
        return generator

    return wrapper

示例:

def coroutine_activator(function):
    @wraps(function)
    def wrapper(*args, **kwargs):
        generator = function(*args, **kwargs)
        next(generator)
        return generator

    return wrapper


@coroutine_activator
def averager():
    total = 0.0
    count = 0
    average = 0.0

    while True:
        term = yield average
        total += term
        count += 1
        average = total / count


cor_avg = averager()
print(getgeneratorstate(cor_avg))
运行结果

四、终止协程和异常处理

协程中未处理的异常会向上冒泡,传给next()函数或.send()方法的调用方(即触发协程的对象)。

1.generator.throw(exc_type[, exc_value[, traceback]])

  • 1.该方法会使生成器在暂停的yield表达式处抛出指定的异常。
  • 2.如果生成器处理了该异常,代码会继续执行到下一个yield表达式,而产出的值会成为generator.throw()方法的返回值。
  • 3.如果没有处理这个异常,异常则会向上冒泡。

示例:

class DemoException(Exception):
    pass


def demo_exc_handling():
    print("-->coroutine started")

    while True:
        try:
            x = yield
#         except GeneratorExit:
#             print("-->GeneratorExit has been handled. Call close methond...")
        except DemoException:
            print("-->DemoException has been handled. Continuing...")
        else:
            print("-->coroutine received:{!r}".format(x))
        finally:
            print("-->end...")


exc_coro = demo_exc_handling()
next(exc_coro)
print(getgeneratorstate(exc_coro))

exc_coro.throw(DemoException)
print(getgeneratorstate(exc_coro))

exc_coro.throw(ZeroDivisionError)
print(getgeneratorstate(exc_coro))
运行结果

2.generator.close()

  • 1.该方法会使生成器在暂停的yield表达式处抛出GeneratorExit异常。
  • 2.如果生成器没有处理这个异常,或者抛出了StopIteration异常(通常是指运行到结尾),调用方不会报错。
  • 3.如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。
  • 4.生成器抛出的其他异常会向上冒泡,传给调用方。
coro = demo_exc_handling()
next(coro)
coro.send(10)

coro.close()
coro.send(20)
运行结果

五、协程return值

有些协程在被激活后,每次驱动(drive)协程时,不会产出值,而是在最后(协程正常终止时)返回一个值(通常是某种累加值)。

from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
    total = 0.0
    count = 0
    average = 0.0

    while True:
        term = yield average
        # 为了返回值,协程必须正常终止。这里使用条件判断,以便退出累计循环
        if term is None:
            break
        total += term
        count += 1
        average = total / count
    # 返回一个 namedtuple,包含 count 和 average 两个字段。在 Python 3.3 之前,如果生成器返回值,解释器会报句法错误
    return Result(count, average)


coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(20)
coro_avg.send(30)

#捕获StopIteration异常,获取 averager 返回的值:
try:
    coro_avg.send(None)
except StopIteration as exc_data:
    r = exc_data.value
print(r)
运行结果

六、yield from

  • 在生成器gen中使用yield from subgen()时,子生成器subgen会获得控制权,把产出的值传给gen的调用方,即调用方可以直接控制subgen。与此同时,gen会阻塞,等待subgen终止。
  • yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。通过这个结构,协程可以把功能委托给子生成器。
  • 调用方:调用委派生成器的客户端代码。
  • 委派生成器:包含 yield from <iterable> 表达式语句的生成器函数(包含yield from语句的生成器)。
  • 子生成器:从 yield from 表达式中 <iterable>部分获取的生成器(yield from后面的表达式)。

简单示例:

def gen():
    yield from "AB"
    yield from range(1, 3)
    
    
print(list(gen()))
运行结果

示例:

Result = namedtuple('Result', 'count average')


# 子生成器
def averager():
    total = 0.0
    count = 0
    average = 0.0

    while True:
        term = yield average
        if term is None:
            break
        total += term
        count += 1
        average = total / count

    return Result(count, average)


# 委派生成器
def grouper(results, key):
    while True:
        results[key] = yield from averager()


# 客户端代码
def main(data):
    results = {}
    for key, values in data.items():
        group = grouper(results, key)
        # 激活委派生成器
        next(group)

        for value in values:
            group.send(value)

        group.send(None)

    print(results)


data = {"girls:kg": [49.9, 39.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
        "girls:m": [1.6, 1.5, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
        "boys:kg": [43.9, 48.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
        "boys:m": [1.6, 1.4, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
        }
main(data)
运行结果

运行结果解释:

委派生成器在yield from 表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出的值发给调用方。子生成器返回之后,解释器会抛出StopIteration异常,并把返回值附加到异常对象上,此时委派生成器会恢复运行。

注意:

  • 如果子生成器不终止,委派生成器会在yield from处永远暂停。
  • 因为委派生成器相当于管道,所以可以把任意数量个委派生成器连接在一起:一个委派生成器使用yield from调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个子生成器,以此类推。最终,这个链条要以一个只使用yield表达式的简单生成器结束;不过,也能以任何可迭代的对象结束。
  • 任何yield from链条都必须由客户驱动,在最外层委派生成器上调用next(...)函数或.send(...)方法。

yield from的意义

  • 1.子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)。
  • 2.使用send()方法发给委派生成器的值都直接传给子生成器。如果发送的值是None,那么会调用子生成器的next()方法。如果发送的值不是None,那么会调用子生成器的send()方法。如果调用的方法抛出StopIteration异常,那么委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器。
  • 3.生成器退出时,生成器(或子生成器)中的return expr表达式会触发StopIteration(expr)异常抛出。
  • 4.yield from表达式的值是子生成器终止时传给StopIteration异常的第一个参数。
  • 5.传入委派生成器的异常,除了GeneratorExit之外都传给子生成器的throw()方法。如果调用throw()方法时抛出StopIteration异常,委派生成器恢复运行。StopIteration之外的异常会向上冒泡,传给委派生成器。
  • 6.如果把GeneratorExit异常传入委派生成器,或者在委派生成器上调用close()方法,那么在子生成器上调用close()方法,如果它有的话。如果调用close()方法导致异常抛出,那么异常会向上冒泡,传给委派生成器;否则,委派生成器抛出GeneratorExit异常。

七、greenlet协程

greenlet由来

虽然CPython(标准Python)能够通过生成器来实现协程,但使用起来还并不是很方便。 与此同时,Python的一个衍生版 Stackless Python。实现了原生的协程,它更利于使用。 于是,大家开始将 Stackless 中关于协程的代码 。单独拿出来做成了CPython的扩展包。 这就是 greenlet 的由来,因此 greenlet 是底层实现了原生协程的 C扩展库。

import greenlet

# 创建greenlet协程
coroutine = greenlet.greenlet() 
# 运行greenlet协程
coroutine.switch()

示例:生产者-消费者模式

import greenlet
import random
def producer():
    while True:
        item = random.randint(0, 99)
        print("生成数字{}".format(item))
        c.switch(item)          # 将data传给c,并切换到c

def consumer():
    while True:
        item = p.switch()       # 切换到p,等待传入数值
        print("消费数字{}".format(item))


if __name__ == "__main__":
    c = greenlet.greenlet(consumer)
    p = greenlet.greenlet(producer)
    c.switch()
运行结果

greenlet 的优势:

  • 1.高性能的原生协程。
  • 2语义更加明确的显式切换。
  • 3.直接将函数包装成协程,保持原有代码风格。

八、gevent协程

gevent是什么:http://www.gevent.org/

gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成。

import gevent
# 创建协程
gevent.spawn(参数)
# gevent协程运行列表
gevent.joinall([协程列表])  

示例:生产者-消费者模式

import gevent
from gevent.queue import Queue
import random


def Consumer(queue):
        while True:
            item = queue.get()
            print("消费数字{}".format(item))


def Producer(queue):
        while True:
            item = random.randint(0, 99)
            queue.put(item)
            print("生产数字{}".format(item))


queue = Queue(3)

p = gevent.spawn(Producer, queue)
c = gevent.spawn(Consumer, queue)
gevent.joinall([p, c])
上一篇下一篇

猜你喜欢

热点阅读