python

Python并发之协程

2024-07-03  本文已影响0人  上善若泪

1 协程

1.1 简介

协程,又称微线程,纤程。英文名Coroutine
协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

def A():
    print '1'
    print '2'
    print '3'

def B():
    print 'x'
    print 'y'
    print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
x
y
3
z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。

看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

1.2 协程优势&分类

1.2.1 优势

1.2.2 分类

协程有两种,一种无栈协程,python中以 asyncio 为代表, 一种有栈协程,python 中 以 gevent 为代表

有栈线程 无栈线程 备注
示例 lua thread
python gevent
C# yield return
C# async\await
python asyncio
是否拥有单独的上下文 上下文包括寄存器、栈帧
局部变量保存位置 无栈协程的局部变量保存在堆上,比如generator的数据成员
优点 1. 每个协程有单独的上下文,可以在任意的嵌套函数中任何地方挂起此协程。
2. 不需要编译器做语法支持,通过汇编指令即可实现
1. 不需要为每个协程保存单独的上下文,内存占用低。
2. 切换成本低,性能更高
缺点 1. 需要提前分配一定大小的堆内存保存每个协程上下文,所以会出现内存浪费或者栈溢出。
2. 上下文拷贝和切换成本高,性能低于无栈协程
1. 需要编译器提供语义支持,比如C# yield return语法糖。
2. 只能在这个生成器内挂起此协程,无法在嵌套函数中挂起此协程。
3. 关键字有一定传染性,异步代码必须都有对应的关键字。作为对比,有栈协程只需要做对应的函数调用
无栈协程无法在嵌套函数中挂起此协程,有栈协程由于是通过保存和切换上下文包括寄存器和执行栈实现,可以在协程函数的嵌套函数内部yield这个协程并唤醒。

1.3 generator协程

Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。

来看例子:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

import time

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'

def produce(c):
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

if __name__=='__main__':
    c = consumer()
    produce(c)


执行结果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为协程,而非线程的抢占式多任务。

1.4 gevent协程

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

from gevent import monkey; spawn
monkey.patch_all()  # 替换标准库以使用 gevent 兼容的版本

def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)

g1 = spawn(f, 5)
g2 = spawn(f, 5)
g3 = spawn(f, 5)
g1.join()
g2.join()
g3.join()

运行结果:
<Greenlet at 0x10e49f550: f(5)> 0
<Greenlet at 0x10e49f550: f(5)> 1
<Greenlet at 0x10e49f550: f(5)> 2
<Greenlet at 0x10e49f550: f(5)> 3
<Greenlet at 0x10e49f550: f(5)> 4
<Greenlet at 0x10e49f910: f(5)> 0
<Greenlet at 0x10e49f910: f(5)> 1
<Greenlet at 0x10e49f910: f(5)> 2
<Greenlet at 0x10e49f910: f(5)> 3
<Greenlet at 0x10e49f910: f(5)> 4
<Greenlet at 0x10e49f4b0: f(5)> 0
<Greenlet at 0x10e49f4b0: f(5)> 1
<Greenlet at 0x10e49f4b0: f(5)> 2
<Greenlet at 0x10e49f4b0: f(5)> 3
<Greenlet at 0x10e49f4b0: f(5)> 4

可以看到,3个greenlet是依次运行而不是交替运行。

要让greenlet交替运行,可以通过gevent.sleep()交出控制权:

def f(n):
    for i in range(n):
        print (gevent.getcurrent(), i)
        gevent.sleep(0)

执行结果:

<Greenlet at 0x10cd58550: f(5)> 0
<Greenlet at 0x10cd58910: f(5)> 0
<Greenlet at 0x10cd584b0: f(5)> 0
<Greenlet at 0x10cd58550: f(5)> 1
<Greenlet at 0x10cd584b0: f(5)> 1
<Greenlet at 0x10cd58910: f(5)> 1
<Greenlet at 0x10cd58550: f(5)> 2
<Greenlet at 0x10cd58910: f(5)> 2
<Greenlet at 0x10cd584b0: f(5)> 2
<Greenlet at 0x10cd58550: f(5)> 3
<Greenlet at 0x10cd584b0: f(5)> 3
<Greenlet at 0x10cd58910: f(5)> 3
<Greenlet at 0x10cd58550: f(5)> 4
<Greenlet at 0x10cd58910: f(5)> 4
<Greenlet at 0x10cd584b0: f(5)> 4

3个greenlet交替运行,把循环次数改为500000,让它们的运行时间长一点,然后在操作系统的进程管理器中看,线程数只有1个。
当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:

from gevent import monkey
monkey.patch_all()

import gevent
import urllib.request


def f(url):
    print('GET: {}'.format(url))
    resp = urllib.request.urlopen(url)
    data = resp.read()
    print('{} bytes received from {}.'.format(len(data), url))

gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])

运行结果:

GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
45661 bytes received from https://www.python.org/.
14823 bytes received from https://github.com/.
304034 bytes received from https://www.yahoo.com/.

从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。

注意gevent只能在Unix/Linux下运行,在Windows下不保证正常安装和运行。

1.5 asyncio

1.5.1 简介

asyncio 是用来编写并发代码的库,使用 async/await 语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

1.5.2 asyncio函数

使用协程中的一般概念:

事件循环函数(包括循环的创建、运行和停止)

1.5.3 async\await

async 关键字:

await 关键字:

挂起与恢复:

关于await后面跟的函数的性质:

1.5.4 asyncio基本操作

1.5.4.1 asyncio协程对象

使用async定义一个协程对象,并创建一个事件循环对象

import asyncio
#定义协程对象
async def get_request(url):
    print("正在请求的url是:",url)
    print('请求成功的url:',url)
    return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')
#创建一个事件循环对象
loop=asyncio.get_event_loop()
#将协程对象注册到loop中,并启动loop
loop.run_until_complete(coroutine_obj)
loop.close()

1.5.4.2 task对象

task对象需要loop对象基础上建立起来

import asyncio
#定义协程对象
async def get_request(url):
    print("正在请求的url是:",url)
    print('请求成功的url:',url)
    return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')

#创建一个事件循环对象
loop=asyncio.get_event_loop()
#基于loop创建了一个task对象
task=loop.create_task(coroutine_obj)
print(task)
#基于loop注册任务
loop.run_until_complete(task)
print(task)
loop.close()

1.5.4.3 future对象

主要函数:

Future 对象相关函数:

future对象与task对象不同的是创建基于asyncio空间来创建的

import asyncio
#定义协程对象
async def get_request(url):
    print("正在请求的url是:",url)
    print('请求成功的url:',url)
    return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')

#创建一个事件循环对象
loop=asyncio.get_event_loop()
#基于loop创建了一个task对象
future=asyncio.ensure_future(coroutine_obj)
print(future)
loop.run_until_complete(future)
print(future)
loop.close()

或者示例:

import sys
import asyncio
import time
 
# 一个对future进行赋值的函数
async def slow_operation(future, num):
    await asyncio.sleep(1)
    # 给future赋值
    future.set_result('Future'+ str(num) +' is done!')
 
def main():
    loop = asyncio.get_event_loop()
    # 创建一个future
    future1 = loop.create_future()
    # 使用ensure_future 创建Task
    asyncio.ensure_future(slow_operation(future1, 1))
 
    future2 = loop.create_future()
    asyncio.ensure_future(slow_operation(future2, 2))
 
    # gather Tasks,并通过run_uniti_complete来启动、终止loop
    loop.run_until_complete(asyncio.gather(future1, future2))
 
    print(future1.result())
    print(future2.result())
 
    loop.close()
 
if __name__ == "__main__":
    main()

1.5.4.4 绑定回调

在使用task或者future绑定回调时,需要先定义回调函数
回调函数中返回的result方法就是任务对象中封装的协程对象对应的函数返回值
注意:回调函数必须有返回值,不然result方法就没有值

def callback_func(task):
    print(task.result())

在使用task或者future绑定回调时,都可以使用方法绑定task.add_done_callback(callback_func)

import asyncio
#定义协程对象
async def get_request(url):
    print("正在请求的url是:",url)
    print('请求成功的url:',url)
    return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')
loop=asyncio.get_event_loop()
future=asyncio.ensure_future(coroutine_obj)
#把回调函数绑定到任务对象中
future.add_done_callback(callback_func)
loop.run_until_complete(future)
loop.close()

1.5.4.5 异步多任务

在一个异步函数中,可以不止一次挂起,也就是可以用多个await
多任务时,对于run_until_complete方法需要这样用asyncio.wait()方法处理:loop.run_until_complete(asyncio.wait(task_list))
代码示例:

import time
import asyncio
async def get_request(url):
    print("正在请求的url是:",url)
    #在异步协程中如果出现了同步模块相关代码,那么就无法实现异步
    # time.sleep(2)
    #当在asyncio中遇到阻塞操作就必须进行手动挂起
    await asyncio.sleep(2)
    print('请求成功的url:',url)    
start_time=time.time()
urls=['www.baidu.com','www.sogou.com','www.goubanjia.com']

#任务列表
task_list=[]
for url in urls:
    coroutine_obj=get_request(url)
    future=asyncio.ensure_future(coroutine_obj)
    task_list.append(future)
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
loop.close()
print(time.time()-start_time)

1.5.4.6 asyncio.gather和asyncio.wait 区别

asyncio.gatherasyncio.wait都是asyncio库中用于处理异步任务(协程)的重要函数,但它们之间存在一些关键的区别。以下是这两个函数之间的主要差异:

以下是使用asyncio.gather和asyncio.wait的示例代码:

import asyncio  
  
async def task(name, delay):  
    print(f"{name} started")  
    await asyncio.sleep(delay)  
    return f"{name} finished"  
  
async def main():  
    # 使用 asyncio.gather  
    tasks = [task("A", 1), task("B", 2), task("C", 3)]  
    results = await asyncio.gather(*tasks)  
    print(results)  # 输出: ['A finished', 'B finished', 'C finished']  
  
    # 使用 asyncio.wait  
    tasks = [task("D", 1), task("E", 2), task("F", 3)]  
    done, pending = await asyncio.wait(tasks)  
    for d in done:  
        print(d.result())  # 分别打印每个已完成协程的结果  
    # 处理 pending 列表中的协程(这里只是打印出来)  
    for p in pending:  
        print(f"Pending: {p}")  
  
asyncio.run(main())
上一篇 下一篇

猜你喜欢

热点阅读