Python 协程
2018-11-14 本文已影响0人
程序员同行者
内核级别的程序切换
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务,切换由操作系统强制控制:
- 一种情况是该任务发生了阻塞;
- 该任务计算的时间过长或有一个优先级更高的程序替代了它。
什么是协程
-
单线程下实现实现并发效果:当一个任务遇到阻塞,会自动切换到其他任务去执行,不断切换和保存状态,可以理解为伪线程。
-
python的线程属于内核级别的,即由操作系统控制调度;
-
单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率。
优点
1. 协程的切换开销更小,属于用户程序级别的切换,操作系统完全感知不到,因而更加轻量级;
2. 单线程内就可以实现并发的效果,最大限度地利用cpu。
缺点
-
协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程;
-
协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。
greenlet模块
greenlet
只是提供了一种比generator
更加便捷的切换方式,当切到一个任务执行时如果遇到I/O
,那就原地阻塞,仍然是没有解决遇到I/O
自动切换来提升效率的问题。
示例:
pip install greenlet
from greenlet import greenlet
from threading import current_thread
def eat(name):
print('<%s>%s eat 1' %(name, current_thread().getName()))
# 启动g2,第一次传入参数
g2.switch('egon')
print('<%s>%s eat 2' %(name, current_thread().getName()))
g2.switch()
def play(name):
print('<%s>%s play 1' %(name, current_thread().getName()))
g1.switch()
print('<%s>%s play 2' %(name, current_thread().getName()))
g1 = greenlet(eat) # 指定要启动的程序
g2 = greenlet(play)
# 启动g1, 且第一次需要传入参数
g1.switch('jack')
'''结果:
<jack>MainThread eat 1
<egon>MainThread play 1
<jack>MainThread eat 2
<egon>MainThread play 2
'''
推荐使用于IO程序切换
在没有io的情况下或者没有重复开辟内存空间的操作反而会降低程序的执行速度。
# 顺序执行 ==================================================
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) # 11.934s
# 使用greenlet切换 ===========================================
from greenlet import greenlet
import time
def f1():
res = 1
for i in range(100000000):
res += i
# 启动g2
g2.switch()
def f2():
res = 1
for i in range(100000000):
res *= i
# 切换到g1中的上次切换的地方
g1.switch()
start = time.time()
# 1、生成greenlet对象, 传入要启动的程序
g1 = greenlet(f1)
g2 = greenlet(f2)
# 2、启动g1
g1.switch()
stop = time.time()
print('run time is %s' % (stop - start)) # 65s
gevent 模块
安装: pip3 install gevent
创建协程对象:g1=gevent.spawn(func,1,2,3,x=4,y=5)
,spawn括号内第一个参数是函数名,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数函数的实参。
方法:
g1.join() # 等待g1结束
gevent.joinall([g1,g2]) # 同上
g1.value # 拿到func1的返回值
示例:
from gevent import monkey
monkey.patch_all() # 此时就可以检测所有的内置的I/O阻塞了
import gevent
import time, os
from threading import Thread, current_thread
def eat(name):
print('%s 执行 eat中<%s>' % (name, current_thread().getName()))
# alex 执行 eat中<DummyThread-1>
# gevent.sleep(3) # 模拟的是gevent可以识别的io阻塞
# 遇到i/o阻塞会在协程程序间来回切换,实现伪多线程
time.sleep(3)
def play(name):
print('%s 执行 play中<%s>' % (name, current_thread().getName()))
# egon 执行 play中<DummyThread-2> 假线程
# gevent.sleep(1)
time.sleep(2)
# 1. 创建协程对象g1, g2, 传入要启动的程序和参数;
# 2. 传入后会自行执行程序;
g1 = gevent.spawn(eat, 'alex')
g2 = gevent.spawn(play, 'egon')
start_time = time.time()
g1.join() # 等待g1任务运行结束
g2.join() # 等待g2任务运行结束
# gevent.joinall([g1, g2])
end_time = time.time()
print('耗时合计: %s' %(end_time-start_time)) # 3s
Socket单线程并发
# 服务端 ==============================================
from gevent import monkey; monkey.patch_all()
from socket import *
import gevent
def talk(conn, addr):
"""与客户端进行交互"""
try:
while True:
res = conn.recv(1024)
# print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally: # 始终都会执行的代码
print('正在关闭服务端管道'.center(20, '-'))
conn.close()
def server(server_ip, port):
"""创建并启动服务器"""
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((server_ip, port))
s.listen(5)
while True:
# 1. 进入待连接状态
# 2. 一旦有客户端连接变生成conn对象
print('等待链接') # 主线程在此处等待
conn, addr = s.accept()
# 3. 创建并启动协程程序,进入协程程序内部,
# 若遇到i/o阻塞, 可以在协程程序之间实现来回切换,减少i/o阻塞的影响
g = gevent.spawn(talk, conn, addr)
print(g)
if __name__ == '__main__':
server('127.0.0.1', 8080)