Python 进程,线程,协程及IO模型
一. 操作系统概念
操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.
操作系统进行进程切换: 1. 出现IO操作;2. 固定时间.
固定时间很短,人感受不到.每个应用层运行起来的程序都是进程.
二. 进程与线程的概念
1. 进程
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程.需要强调的是:同一个程序执行两次,那也是两个进程.
进程: 资源管理单位(容器).
线程: 最小执行单位,管理线程的是进程.
进程的定义:
进程就是一个程序在一个数据集上的一次动态执行过程.进程一般由程序,数据集,进程控制块三部分组成.我们编写的程序 用来描述进程完成那些功能以及如何完成;数据集 则是程序在执行过程中多需要使用的资源;进程控制块 用来记录进程外部特征,描述进程的执行变化过程,系统可以用它来控制和管理进程,它是系统感知进程存在的唯一标志.
举一例说明进程:
想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完后,这位计算机科学家又回来做蛋糕,从他离开时的那一步继续做下去。
2. 线程
线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使得进程内并发成为可能. 假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,进程C负责保存内容到硬盘中的任务。这里进A,B,C间的协作涉及到了进程通信问题,而且有共同都需要拥有的东西——-文本内容,不停的切换造成性能上的损失。若有一种机制,可以使任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带来的性能损耗,那就好了。是的,这种机制就是线程。
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由 线程ID
,程序计数器
,寄存器集合
,和堆栈
, 共同组成.线程的引入减小了进程并发执行的开销,提高了操作系统的并发性能.线程没有自己的系统资源.
3. 进程与线程的关系
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制进程.
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,控制该进程的地址空间.
进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位.
进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。或者说进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。
进程与线程的关系.png线程则是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
进程和线程的关系:
- (1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。
- (2)资源分配给进程,同一进程的所有线程共享该进程的所有资源。
- (3)CPU分给线程,即真正在CPU上运行的是线程。(每个线程的空间是进程cpu分出来的 一部分)
4. 并行与并发
无论是并行还是并发,在用户看来都是'同事'运行的,而一个CPU同一时刻只能执行一个任务.
并行: 同时运行,只有具备多个cpu才能实现并行.
并发: 是伪并行,即看起来是同时运行,单个cpu+多道技术.
所有现代计算机经常会在同一时间做很多件事,一个用户的PC(无论是单cpu还是cpu),都可以同时运行多个任务(一个任务可以理解为一个进程)。当启动系统时,会秘密启动许多进程:
启动一个进程来杀毒(360软件)
启动一个进程来看电影(暴风影音)
启动一个进程来聊天(腾讯QQ)
所有的这些进程都需被管理,于是一个支持多进程的多道程序系统是至关重要的。
多道技术: 内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)。
5. 同步和异步
-
同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;
-
异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。
举个例子,打电话时就是同步通信,发短息时就是异步通信。
6. 进程的创建
但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。
而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程
- 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
- 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
- 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
- 一个批处理作业的初始化(只在大型机的批处理系统中应用)
无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:
- 1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)
- 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。
关于创建的子进程,UNIX和windows
- 1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间,任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。
- 2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。
7. 进程的终止
- 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
- 出错退出(自愿,程序员主动抛出异常,例如raise)
- 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等)
- 被其他进程杀死(非自愿,如kill -9)
8. 进程的层次结构
无论UNIX还是windows,进程只有一个父进程,不同的是:
- 在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。
- 在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。
9. 进程的状态
ail -f access.log |grep '404'
执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。
进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行
其实在两种情况下会导致一个进程在逻辑上不能运行,
- 进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作
- 与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。
因而一个进程有三个状态
进程的三种状态.png
10. 进程并发的实现
进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),每个进程占用一个进程表项(这些表项也称为进程控制块)
进程表项.png
该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。
三. threading模块
python的多线程:由于GIL,导致同一进程中只能有一个线程运运行在cpu上,而不能有多个线程同时在一个cpu上运行.
实现多线程的并发需要使用threading模块.
线程对象的创建
1. Thread类直接创建
# 多线程的并发,只能是交给一个cpu执行,不能多个cpu执行.即多个线程不能实现并行.
# 多线程并发方式一:
import threading
import time
def tingge():
print("听歌")
time.sleep(3)
print("听歌结束")
def xieboke():
print("写博客")
time.sleep(5)
print("写博客结束")
print(time.time()-s) # 计算整个程序运行时间,不能放在函数外,不然要和另外三个进程竞争,导致其输出的时间不准确。
s = time.time()
t1 = threading.Thread(target=tingge) # 创建听歌线程,多线程的主进程.
t2 = threading.Thread(target=xieboke) # 创建写博客线程,多线程的主进程
t1.strat() # 运行听歌线程,多线程的子线程
t2.start() #运行写博客线程,多线程的子进程
print("ending") # 多线程的主进程
=========================
听歌
ending
写博客
听歌结束
写博客结束
5.0137574672698975
因为三个线程t1.start(),t2.start() 和print("ending")之间竞争的原因,print("ending")竞争成功,所以先运行print("ending")这个进程。
当tingge函数中睡眠更改为8s的话,不考虑小数点的情况下,真实运行时间为8s
2. Thread类继承式创建
# 调用多线程方式二
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):
print("running on number:%s" %self.num)
time.sleep(3)
t1 = MyThread(56)
t2 = MyThread(78)
t1.start() # 该进程运行run函数原因,请查看源码,一系列的调用最终是调用run函数
t2.start() # 该进程运行run函数原因,请查看源码,一系列的调用最终是调用run函数
print("ending")
3. Thread类的实例方法
join()
- 在子进程完成运行之前,这个子线程的父进程将一直被阻塞.
import threading
from time import ctime,sleep
import time
def Music(name):
print("Begin listenning to {name}.{time}".format(name=name,time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))
def Blog(title):
print("Begin recording the {title}.{time}".format(title=title,time=ctime()))
sleep(5)
print("end recording {time}".format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music,args=("FILL ME",))
t2 = threading.Thread(target=Blog,args=("",))
threads.append(t1)
threads.append(t2)
if __name__ == "__main__":
for t in threads:
t.start()
t.join()
print("all over %s "%ctime())
==============================
Begin listenning to FILL ME.Tue Aug 21 17:15:29 2018
Begin recording the python.Tue Aug 21 17:15:29 2018 #Music函数和Blog函数同时运行
end listening Tue Aug 21 17:15:32 2018
end recording Tue Aug 21 17:15:34 2018
all over Tue Aug 21 17:15:34 2018
- t.join():线程对象t未执行完,会阻塞你的主线程 ,但不会阻塞子进程,子进程没有任何影响。
#添加t1.join()
import threading
from time import ctime,sleep
import time
def Music(name):
print("Begin listenning to {nam
sleep(3)
print("end listening {time}".fo
def Blog(title):
print("Begin recording the {tit
sleep(5)
print("end recording {time}".fo
threads = []
t1 = threading.Thread(target=Music,
t2 = threading.Thread(target=Blog,a
threads.append(t1)
threads.append(t2)
if __name__ == "__main__":
for t in threads:
t.start() #子进程
t1.join() #添加堵塞
print("all over %s" %ctime())
=====================
Begin listenning to FILL ME.Tue Aug 21 17:30:36 2018
Begin recording the python.Tue Aug 21 17:30:36 2018
end listening Tue Aug 21 17:30:39 2018 #但只有当子进程Music函数运行完才能运行主进程,所以这里打印结果与上一个程序顺序不同
all over Tue Aug 21 17:30:39 2018
end recording Tue Aug 21 17:30:41 2018 #整个程序执行时间为5s
- t1.join()改成t2.join()同样证明会阻塞你的主线程 ,但不会阻塞子进程
#t1.join()改成t2.join()查看运行结果
import threading
from time import ctime,sleep
import time
def Music(name):
print("Begin listenning to {name}
sleep(3)
print("end listening {time}".form
def Blog(title):
print("Begin recording the {title
sleep(5)
print("end recording {time}".form
threads = []
t1 = threading.Thread(target=Music,ar
t2 = threading.Thread(target=Blog,arg
threads.append(t1)
threads.append(t2)
if __name__ == "__main__":
for t in threads:
t.start()
t2.join()
print("all over %s" %ctime())
========================
Begin listenning to FILL ME.Tue Aug 21 17:32:44 2018
Begin recording the python.Tue Aug 21 17:32:44 2018
end listening Tue Aug 21 17:32:47 2018
end recording Tue Aug 21 17:32:49 2018
all over Tue Aug 21 17:32:49 2018 #整个程序执行时间为5s
- 当t.join()在for循环内就不能实现多线程了,没有意义。
import threading
from time import ctime,sleep
import time
def Music(name):
print("Begin listenning to {name}.
sleep(3)
print("end listening {time}".forma
def Blog(title):
print("Begin recording the {title}
sleep(5)
print("end recording {time}".forma
threads = []
t1 = threading.Thread(target=Music,arg
t2 = threading.Thread(target=Blog,args
threads.append(t1)
threads.append(t2)
if __name__ == "__main__":
for t in threads:
t.start()
t.join()
print("all over %s" %ctime())
===============================
Begin listenning to FILL ME.Tue Aug 21 17:34:43 2018
end listening Tue Aug 21 17:34:46 2018
Begin recording the python.Tue Aug 21 17:34:46 2018 #函数Music和函数Blog不能同时进行
end recording Tue Aug 21 17:34:51 2018
all over Tue Aug 21 17:34:51 2018 #运行时间8s
4. setDaemon()设置守护线程
将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。
当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完
成。想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。
但是有时候我们需要的是只要主线程 完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法了。
# 主进程结束但子进程未结束,整个程序同样结束。
import threading
from time import ctime,sleep
def Music(name):
print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))
def Blog(title):
print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
for t in threads:
t.setDaemon(True) # 注意:一定在start之前设置
t.start()
print ("all over %s" %ctime())
============================
Begin listening to FILL ME. Tue Aug 21 19:02:20 2018
all over Tue Aug 21 19:02:20 2018
Begin recording the python. Tue Aug 21 19:02:20 2018
只设置t1为守护线程
import threading
from time import ctime,sleep
def Music(name):
print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))
def Blog(title):
print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
t1.setDaemon(True) # 注意:一定在start之前设置
t1.start()
t2.start()
print ("all over %s" %ctime())
=============================
Begin listening to FILL ME. Tue Aug 21 19:26:54 2018
all over Tue Aug 21 19:26:54 2018
Begin recording the python. Tue Aug 21 19:26:54 2018
end listening Tue Aug 21 19:26:57 2018
end recording Tue Aug 21 19:26:59 2018 #因为t1运行时间比较长,所以t1运行完其他线程也都运行完毕。
只设置t2为守护线程
import threading
from time import ctime,sleep
def Music(name):
print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))
def Blog(title):
print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
t2.setDaemon(True) # 注意:一定在start之前设置
t1.start()
t2.start()
print ("all over %s" %ctime())
==============================
Begin listening to FILL ME. Mon May 8 17:54:44 2017
Begin recording the python. Mon May 8 17:54:44 2017
all over Mon May 8 17:54:44 2017
end listening Mon May 8 17:54:47 2017 #因为t2进程运行只有3s,而t1进程运行需要5s,所以当t2进程和主进程运行完毕,整个程序就结束,不管t1是否运行完毕。
5. 其它方法
Thread实例对象的方法
t.start()
: 激活线程,
t.getName()
: 获取线程的名称
t.setName()
: 设置线程的名称
t.name
: 获取或设置线程的名称
t.is_alive()
: 判断线程是否为激活状态
t.isAlive()
:判断线程是否为激活状态
t.setDaemon()
设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon()
: 判断是否为守护线程
t.ident
:获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join()
:逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run()
:线程被cpu调度后自动执行线程对象的run方法
threading模块提供的一些方法:
threading.currentThread()
: 返回当前的线程变量。
threading.enumerate()
: 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount()
: 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
import os
import threading
from time import ctime, sleep
def Music(name):
print("Begin listening to {name}. {time}".format(name=name, time=ctime()))
sleep(3)
print("线程数:", threading.activeCount()) # threading.activeCount()线程数:3
print("正在运行的线程:", threading.enumerate()) # 正在运行的线程
print("正在运行Music线程的id:",os.getpid(),"正在运行Music线程的上一级进程id:",os.getppid())
print("end listening {time}".format(time=ctime()))
def Blog(title):
print("Begin recording the {title}. {time}".format(title=title, time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music, args=('FILL ME',), name="sub_thread") # name="sub_thread"定义线程名
t2 = threading.Thread(target=Blog, args=('python',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
t2.setDaemon(True) # 注意:一定在start之前设置
t1.start()
t2.start()
print("all over %s" % ctime())
================================
Begin listening to FILL ME. Tue Aug 21 19:49:05 2018
all over Tue Aug 21 19:49:05 2018
Begin recording the python. Tue Aug 21 19:49:05 2018
线程数: 3
正在运行的线程: [<_MainThread(MainThread, stopped 140104305641216)>, <Thread(sub_thread, started 140104279365376)>, <Thread(Thread-1, started daemon 140104270972672)>]
正在运行Music线程的id: 124750 正在运行Music线程的上一级进程id: 52971
end listening Tue Aug 21 19:49:08 2018
四. GIL(全局解释器锁)
1. GIL定义
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
GIL加在cpython解释器中,其他的python解释器不会有GIL.
Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL : 在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL优缺点:
-
缺点:
多处理器退化为单处理器; -
优点:
避免大量的加锁解锁操作
2. GIL的早期设计
Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。 于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。
慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。
有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,并且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?
3. GIL的影响
无论你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。 所以,python是无法利用多核CPU实现多线程的。 这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
计算密集型:一直在使用CPU.
IO密集型:存在 大量的IO操作.
对于IO密集型任务,python的多线程能够节省时间.
对于计算密集型任务,Python的多线程并没有用处.
- 以下程序为计算密集型任务:
#单线程即cpu串行情况下,查看运行时间
import time
def cal(n):
sum=0
for i in range(n):
sum+=i
s=time.time()
cal(50000000)
cal(50000000)
print("time",time.time()-s)
=========================
time 7.650153636932373 #python3串行运行结果
('time', 12.600000143051147) #python2串行运行结果
- 多线程情况下运行程序:
import time
import threading
def cal(n):
sum=0
for i in range(n):
sum+=i
s=time.time()
t1=threading.Thread(target=cal,args=(50000000,))
t2=threading.Thread(target=cal,args=(50000000,))
t1.start()
t2.start()
t1.join()
t2.join()
print("time",time.time()-s)
==========================
time 7.961973667144775 #python3中多线程运行时间
('time', 20.12600016593933) #python2中多线程运行时间
从上述单线程和多线程运行结果来看,不管在python2或者3中运行结果均显示多线程比单线程运行时间更长。
因为GIL锁限制你只有一个线程执行,切换进程浪费时间,导致多线程话费时间更多。
python3中时间差不明显的原因是因为python3改进了GIL锁,但根本没有解决问题。
4. 解决方案
1. python使用多核,即开多个进程。
- 方法一: 协程+多进程。使用方法简单,效率还可以,一般使用该方法。
协程yield是你自己写的,是自己定义什么时候切换进程。 - 方法二:IO多路复用。使用复杂,但效率很高。不常用。
2. 终极思路:
换C模块实现多线程,即换一个python解释器,或者换门编程语言避免GIL锁。
5. 多进程
用multiprocessing替代Thread multiprocessing库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
# coding:utf8
from multiprocessing import Process
import time
def counter():
i = 0
for _ in range(40000000):
i = i + 1
return True
def main():
l = []
start_time = time.time()
for _ in range(2):
t = Process(target=counter)
t.start()
l.append(t)
# t.join()
for t in l:
t.join()
end_time = time.time()
print("Total time: {}".format(end_time - start_time))
if __name__ == '__main__':
main()
==============================
py2.7:
串行:6.1565990448 s
并行:3.1639978885 s
py3.5:
串行:6.556925058364868 s
并发:3.5378448963165283 s
当然multiprocessing也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。
这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。
总结: 因为GIL的存在,只有IO Bound场景下得多线程会得到较好的性能 - 如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现 - GIL在较长一段时间内将会继续存在,但是会不断对其进行改进。所以对于GIL,既然不能反抗,那就学会去享受它吧!
五. 同步锁(Lock)
先看代码:
import time
import threading
def addNum():
global num # 在每个线程中都获取这个全局变量
num -= 1
num = 100 # 设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有线程执行完毕
t.join()
print('Result: ', num)
==================================
Result: 0
修改addNum中的代码:
import time
import threading
def addNum():
global num # 在每个线程中都获取这个全局变量
temp = num
time.sleep(1)
num = temp - 1 # 对此公共变量进行-1操作
num = 100 # 设定一个共享变量
thread_list = []
for i in range(100): # 循环进行了100次addNum函数
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有线程执行完毕
t.join()
print('Result: ', num)
============================
Result: 99
100个线程同时竞争运行函数,睡眠1s肯定够100个进程运行到同时处于睡眠的状态,第一个竞争到的肯定率先醒来速度极快计算完,num=99,线程2醒来从上面携带的global num=100同样计算num=99
睡眠时间较短时
import time
import threading
def addNum():
global num #在每个线程中都获取这个全局变量
temp=num
time.sleep(0.001)
num =temp-1 # 对此公共变量进行-1操作
num = 100 #设定一个共享变量
thread_list = []
for i in range(100): #循环进行了100次addNum函数
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print('Result: ', num)
==============================
93或91或89...
每次执行程序结果都不同。100个线程因为GIL大锁的原因竞争运行函数,for循环第一次时线程1率先运行函数,线程1最快运行到time.sleep(0.001)睡眠时,GIL释放,线程1还未运行完addNum函数。for循环了2,3...次,线程2,3...竞争到运行函数,假设线程1 醒来时不知道有多少个线程在同时运行函数,当线程1计算num值,num值改变了,改变后的num值对在函数中的线程2,3...计算时num已经不是100的初始值了,num值由于一直不停的有线程进入一直在改变。而且线程1睡眠时不知道有多少个线程同时在睡眠,最后的结果肯定也不同。
上述就是线程安全问题,数据不可控,不安全,解决方法就是再创建一把锁。
锁通常被用来实现对共享资源的同步访问。 为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:
#注意获取锁和释放锁的位置
import time
import threading
def addNum():
global num
lock.acquire() #获取这把锁
temp=num
time.sleep(0.01)
num =temp-1
lock.release() #释放这把锁
num = 100
thread_list = []
lock=threading.Lock() #创建一把锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list:
t.join()
print('Result: ', num)
==============================
Result: 0
上锁的作用是这个线程未结束其他线程无法竞争,只能等,是一个串行,运行时间为0.001s*100次。
但与join()不同的是:join()是整个程序是串行的,上锁的话只有公共数据部分加锁,是串行的,程序其他内容还是并行的。
但上锁后的程序可能会出现死锁的情况.
六. 死锁与递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
import threading
import time
mutexA = threading.Lock() #创建一把锁
mutexB = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
mutexA.acquire() # 如果锁被占用,则阻塞在这里,等待锁的释放
print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
mutexB.acquire()
print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
mutexB.release()
mutexA.release() #释放这把锁
def fun2(self):
mutexB.acquire()
print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
time.sleep(0.2)
mutexA.acquire() #获取这把锁
print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
mutexA.release()
mutexB.release()
if __name__ == "__main__":
print("start-----------%s" % time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()
出现死锁不停竞争,程序卡住。
解决方法:
在python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
rl = threading.RLock() # 递归锁
rl.acquire() # 上锁 计数+1 counter=1
rl.acquire() # 上锁 计数+1 counter=2
...
rl.release() # 解锁 计数-1 counter=1
rl.release() # 解锁 计数-1 counter=0
counter记录了acquire的次数,直到一个线程所有的acquire都被release,即count为0时,其他线程才可以访问该资源。
import threading
import time
Rlock = threading.RLock() # 创建一个递归锁
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.func1()
self.func2()
def func1(self):
Rlock.acquire() # 如果锁被占用,则阻塞在这里,等待锁的释放 counter = 1
print('I am %s ,get res: %s --- %s ' % (self.name, 'ResA', time.time()))
Rlock.acquire() # counter = 2
print('I am %s ,get res: %s --- %s ' % (self.name, 'ResB', time.time()))
Rlock.release() # counter = 1
Rlock.release() # counter = 0
def func2(self):
Rlock.acquire() # counter = 1
print('I am %s ,get res: %s --- %s ' % (self.name, 'ResB', time.time()))
time.sleep(0.2)
Rlock.acquire() # counter = 2
print('I am %s ,get res: %s --- %s ' % (self.name, 'ResA', time.time()))
Rlock.release() # counter = 1 #释放这把锁
Rlock.release() # counter = 0
if __name__ == '__main__':
print('start ----------- %s' % time.time())
for i in range(0, 10):
mt = MyThread()
mt.start()
七. 同步条件 Event对象
线程之间的通信作用
线程的一个关键特性是每个线程都是独立运行且状态不可预测。 如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。 为了解决这些问题,我们需要使用threading库中的Event对象。
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。 如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
Event.png
event.isSet():
返回event的状态值,False或True;
event.wait():
如果event.isSet()==False将阻塞线程,可以加参数,表示等待秒数;
event.set():
设置event的状态值将为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():
恢复event的状态值为False。
可以考虑一种应用场景(仅仅作为说明),例如,我们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', )
def worker(event):
logging.debug('Waiting for redis ready...')
event.wait()
logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
time.sleep(1)
def main():
readis_ready = threading.Event()
t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
t1.start()
t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
t2.start()
logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
time.sleep(3) # simulate the check progress
readis_ready.set()
if __name__ == "__main__":
main()
=======================================
(t1 ) Waiting for redis ready...
(t2 ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1 ) redis ready, and connect to redis server and do some work [Wed Aug 22 00:43:20 2018]
(t2 ) redis ready, and connect to redis server and do some work [Wed Aug 22 00:43:20 2018]
threading.Event的wait方法还接受一个超时参数,默认情况下如果事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。对应于上面的应用场景,如果Redis服务器一致没有启动,我们希望子线程能够打印一些日志来不断地提醒我们当前没有一个可以连接的Redis服务,我们就可以通过设置这个超时参数来达成这样的目的:
import logging
import time
import threading
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', )
def worker(event):
logging.debug('等待redis准备…')
while not event.isSet():
logging.debug('等待连接...')
event.wait(3) # if flag = False阻塞,等待flag = True 继续执行
logging.debug('redis准备好,并连接到redis服务器和做一些工作 %s', time.ctime())
time.sleep(1)
def main():
r = threading.Event() # flag = False
t1 = threading.Thread(target=worker, args=(r,), name='t1')
t1.start()
t2 = threading.Thread(target=worker, args=(r,), name='t2')
t2.start()
logging.debug('首先,检查redis服务器,确保它是OK,然后触发复述事件做好准备')
time.sleep(6)
r.set() # flag = True
============================
(t1 ) 等待redis准备…
(t1 ) 等待连接...
(t2 ) 等待redis准备…
(MainThread) 首先,检查redis服务器,确保它是OK,然后触发复述事件做好准备
(t2 ) 等待连接...
(t1 ) 等待连接...
(t2 ) 等待连接...
(t1 ) 等待连接...
(t2 ) 等待连接...
(t1 ) redis准备好,并连接到redis服务器和做一些工作 Wed Aug 22 00:50:30 2018
(t2 ) redis准备好,并连接到redis服务器和做一些工作 Wed Aug 22 00:50:30 2018
这样,我们就可以在等待Redis服务启动的同时,看到工作线程里正在等待的情况。
八. Semaphore(信号量)
同时只有n个线程可以获得semaphore,即可以限制最大连接数为n)
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
import threading
import time
semaphore = threading.Semaphore(5)
def func():
if semaphore.acquire():
print(threading.currentThread().getName() + ' get semaphore',time.ctime())
time.sleep(2)
semaphore.release()
for i in range(20):
t1 = threading.Thread(target=func)
t1.start()
========================================
Thread-1 get semaphore Mon Oct 23 18:44:40 2017
Thread-2 get semaphore Mon Oct 23 18:44:40 2017
Thread-3 get semaphore Mon Oct 23 18:44:40 2017
Thread-4 get semaphore Mon Oct 23 18:44:40 2017
Thread-5 get semaphore Mon Oct 23 18:44:40 2017
Thread-8 get semaphore Mon Oct 23 18:44:42 2017
Thread-9 get semaphore Mon Oct 23 18:44:42 2017
Thread-7 get semaphore Mon Oct 23 18:44:42 2017
Thread-6 get semaphore Mon Oct 23 18:44:42 2017
Thread-10 get semaphore Mon Oct 23 18:44:42 2017
Thread-13 get semaphore Mon Oct 23 18:44:44 2017
Thread-11 get semaphore Mon Oct 23 18:44:44 2017
Thread-12 get semaphore Mon Oct 23 18:44:44 2017
Thread-15 get semaphore Mon Oct 23 18:44:44 2017
Thread-14 get semaphore Mon Oct 23 18:44:44 2017
Thread-16 get semaphore Mon Oct 23 18:44:46 2017
Thread-18 get semaphore Mon Oct 23 18:44:46 2017
Thread-17 get semaphore Mon Oct 23 18:44:46 2017
Thread-19 get semaphore Mon Oct 23 18:44:46 2017
Thread-20 get semaphore Mon Oct 23 18:44:46 2017
20个线程同时获取,但每次只能运行5个线程,所以运行程序显示的结果是5个5个的打印出来。
九. multiprocessing模块
由于GIL的存在,Python不存在多线程,要充分利用多核资源,就需要使用多进程。
multiprocessing模块是Python中的多进程管理包。
通过multiprocessing.Process对象来创建一个进程,Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。
multiprocessing与threading一样,调用同一套API。
1. python的进程调用
- Process类调用
from multiprocessing import Process
import time
def f(name):
print('hello', name, time.ctime())
time.sleep(1)
if __name__ == '__main__':
p_list = []
for i in range(3):
p = Process(target=f, args=('xuyaping:%s' % i,))
p_list.append(p)
p.start()
for i in p_list:
p.join()
print('end')
===================================
hello xuyaping:0 Mon Oct 23 18:57:11 2017
hello xuyaping:2 Mon Oct 23 18:57:11 2017
hello xuyaping:1 Mon Oct 23 18:57:11 2017
end
- 继承Process类调用
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
# self.name = name
def run(self):
print('hello', self.name, time.ctime())
time.sleep(1)
if __name__ == '__main__':
p_list = []
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('end')
================================================
hello MyProcess-1 Mon Oct 23 18:59:08 2017
hello MyProcess-2 Mon Oct 23 18:59:08 2017
hello MyProcess-3 Mon Oct 23 18:59:08 2017
end
2 . process类
- 构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group:
线程组,目前还没有实现,库引用中提示必须是None;
target:
要执行的方法;
name:
进程名;
args/kwargs:
要传入方法的参数。
- 实例方法:
is_alive():
返回进程是否在运行。
join([timeout]):
阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():
进程准备就绪,等待CPU调度
run():
strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():
不管任务是否完成,立即停止工作进程
- 属性:
daemon:
和线程的setDeamon功能一样
name:
进程名字。
pid:
进程号。
from multiprocessing import Process
import os
import time
def info(name):
print("name:",name)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("------------------")
time.sleep(1)
def foo(name):
info(name)
if __name__ == '__main__':
info('main process line')
p1 = Process(target=info, args=('xuyaping',))
p2 = Process(target=foo, args=('egon',))
p1.start()
p2.start()
p1.join()
p2.join()
print("ending")
========================================
name: main process line
parent process: 9900
process id: 13264
------------------
name: xuyaping
name: egon
parent process: 13264
process id: 13720
------------------
parent process: 13264
process id: 20128
------------------
ending
通过tasklist(Win)或者ps -elf |grep(linux)命令检测每一个进程号(PID)对应的进程名
十. 进程间通讯
1. 进程队列Queue
from multiprocessing import Process, Queue
import queue
def f(q, n):
# q.put([123, 456, 'hello'])
q.put(n * n + 1)
print("son process", id(q))
if __name__ == '__main__':
q = Queue() # try: q=queue.Queue()
print("main process", id(q))
for i in range(3):
p = Process(target=f, args=(q, i))
p.start()
print(q.get())
print(q.get())
print(q.get())
============================================
main process 140231804541808
son process 140231804541808
1
son process 140231804541808
2
son process 140231804541808
5
2. 管道(pipe)
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([12, {"name":"xyp"}, 'hello'])
response=conn.recv()
print("response",response)
conn.close()
print("q_ID2:",id(child_conn))
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
print("q_ID1:",id(child_conn))
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
parent_conn.send("儿子你好!")
p.join()
=====================================
q_ID1: 140077283723752
[12, {'name': 'xyp'}, 'hello']
response 儿子你好!
q_ID2: 140077283723752
Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。
3. manager
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
from multiprocessing import Process, Manager
def f(d, l,n):
d[n] = n
d["name"] ="tom"
l.append(n)
#print("l",l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d,l,i))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
================================
{4: 4, 'name': 'tom', 2: 2, 1: 1, 3: 3, 0: 0, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}
[0, 1, 2, 3, 4, 4, 2, 1, 3, 0, 5, 6, 7, 8, 9]
十一. 进程池(Using a pool of workers)
Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:
from multiprocessing import Pool
import time
def foo(args):
time.sleep(1)
print(args)
if __name__ == '__main__':
p = Pool(5)
for i in range(30):
p.apply_async(func=foo, args= (i,))
p.close() # 等子进程执行完毕后关闭线程池
# time.sleep(2)
# p.terminate() # 立刻关闭线程池
p.join()
一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。
processes :
使用的工作进程的数量,如果processes是None那么使用os.cpu_count()返回的数量。
initializer:
如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:
工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context:
用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context
注意:Pool对象的方法只可以被创建pool的进程所调用。
- 进程池的方法
apply(func[, args[, kwds]]) :
使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
apply_async(func[,args[,kwds[,callback[,error_callback]]]]) :
apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
close() :
阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() :
不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
join() :
wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
map
(func, iterable[, chunksize])
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])
imap
(func, iterable[, chunksize])
imap_unordered
(func, iterable[, chunksize])
starmap
(func, iterable[, chunksize])
starmap_async
(func, iterable[, chunksize[, callback[, error_back]]])
自己做个线程池
简单往队列中传输线程数
import threading
import time
import queue
class Threadingpool():
def __init__(self,max_num = 10):
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
def getthreading(self):
return self.queue.get()
def addthreading(self):
self.queue.put(threading.Thread)
def func(p,i):
time.sleep(1)
print(i)
p.addthreading()
if __name__ == "__main__":
p = Threadingpool()
for i in range(20):
thread = p.getthreading()
t = thread(target = func, args = (p,i))
t.start()
往队列中无限添加任务
import queue
import threading
import contextlib
import time
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num):
self.q = queue.Queue()
self.max_num = max_num
self.terminal = False
self.generate_list = []
self.free_list = []
def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread
self.generate_list.append(current_thread)
event = self.q.get() # 获取线程
while event != StopEvent: # 判断获取的线程数不等于全局变量
func, arguments, callback = event # 拆分元祖,获得执行函数,参数,回调函数
try:
result = func(*arguments) # 执行函数
status = True
except Exception as e: # 函数执行失败
status = False
result = e
if callback is not None:
try:
callback(status, result)
except Exception as e:
pass
# self.free_list.append(current_thread)
# event = self.q.get()
# self.free_list.remove(current_thread)
with self.work_state():
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""
关闭线程,给传输全局非元祖的变量来进行关闭
:return:
"""
for i in range(len(self.generate_list)):
self.q.put(StopEvent)
def terminate(self):
"""
突然关闭线程
:return:
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent)
self.q.empty()
@contextlib.contextmanager
def work_state(self):
self.free_list.append(threading.currentThread)
try:
yield
finally:
self.free_list.remove(threading.currentThread)
def work(i):
print(i)
return i +1 # 返回给回调函数
def callback(ret):
print(ret)
pool = ThreadPool(10)
for item in range(50):
pool.run(func=work, args=(item,),callback=callback)
pool.terminate()
# pool.close()
十二. yield和协程(Python协程)
线程和进程的操作是由程序触发系统接口,最后的执行者是系统; 协程的操作则是程序员。
-
协程存在的意义: 对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
-
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
-
event loop是协程执行的控制点, 如果你希望执行协程, 就需要用到它们。
-
event loop提供了如下的特性:
- 注册、执行、取消延时调用(异步函数)
- 创建用于通信的client和server协议(工具)
- 创建和别的程序通信的子进程和协议(工具)
- 把函数调用送入线程池中
示例:
import asyncio
async def cor1():
print("COR1 start")
await cor2()
print("COR1 end")
async def cor2():
print("COR2")
loop = asyncio.get_event_loop()
loop.run_until_complete(cor1())
loop.close()
==========================================
COR1 start
COR2
COR1 end
最后三行是重点
asyncio.get_event_loop() : asyncio启动默认的event loop
*run_until_complete() : * 这个函数是阻塞执行的,知道所有的异步函数执行完成,
close() : 关闭event loop。
1. greenlet
方便手动切换
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greenlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
=================================
COR1 start
COR2
COR1 end
2.gevent
自动切换
- gevent是第三方库,通过greenlet实现协程
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
import gevent
import time
def foo():
print("running in foo")
gevent.sleep(2)
print("switch to foo again")
def bar():
print("switch to bar")
gevent.sleep(5)
print("switch to bar again")
start=time.time()
gevent.joinall(
[gevent.spawn(foo),
gevent.spawn(bar)]
)
print(time.time()-start)
============================================
running in foo
switch to bar
switch to foo again
switch to bar again
5.010286569595337
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time
def f(url):
print('GET: %s' % url)
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
start=time.time()
gevent.joinall([
gevent.spawn(f, 'https://itk.org/'),
gevent.spawn(f, 'https://www.github.com/'),
gevent.spawn(f, 'https://zhihu.com/'),
])
# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')
print(time.time()-start)
===============================================
GET: https://itk.org/
GET: https://www.github.com/
GET: https://zhihu.com/
11785 bytes received from https://zhihu.com/.
12221 bytes received from https://itk.org/.
51166 bytes received from https://www.github.com/.
4.193239688873291
自己做个线程池
简单往队列中传输线程数
import threading
import time
import queue
class Threadingpool():
def __init__(self,max_num = 10):
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
def getthreading(self):
return self.queue.get()
def addthreading(self):
self.queue.put(threading.Thread)
def func(p,i):
time.sleep(1)
print(i)
p.addthreading()
if __name__ == "__main__":
p = Threadingpool()
for i in range(20):
thread = p.getthreading()
t = thread(target = func, args = (p,i))
t.start()
往队列中无限添加任务
import queue
import threading
import contextlib
import time
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num):
self.q = queue.Queue()
self.max_num = max_num
self.terminal = False
self.generate_list = []
self.free_list = []
def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread
self.generate_list.append(current_thread)
event = self.q.get() # 获取线程
while event != StopEvent: # 判断获取的线程数不等于全局变量
func, arguments, callback = event # 拆分元祖,获得执行函数,参数,回调函数
try:
result = func(*arguments) # 执行函数
status = True
except Exception as e: # 函数执行失败
status = False
result = e
if callback is not None:
try:
callback(status, result)
except Exception as e:
pass
# self.free_list.append(current_thread)
# event = self.q.get()
# self.free_list.remove(current_thread)
with self.work_state():
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""
关闭线程,给传输全局非元祖的变量来进行关闭
:return:
"""
for i in range(len(self.generate_list)):
self.q.put(StopEvent)
def terminate(self):
"""
突然关闭线程
:return:
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent)
self.q.empty()
@contextlib.contextmanager
def work_state(self):
self.free_list.append(threading.currentThread)
try:
yield
finally:
self.free_list.remove(threading.currentThread)
def work(i):
print(i)
return i +1 # 返回给回调函数
def callback(ret):
print(ret)
pool = ThreadPool(10)
for item in range(50):
pool.run(func=work, args=(item,),callback=callback)
pool.terminate()
# pool.close()
十三. IO模型
1 定义:
I/O输入/输出(Input/Output),分为IO设备和IO接口两个部分。
2 . IO模型前戏准备
在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O
- 用户空间与内核空间
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。
操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。
为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。
针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间
,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间
。**
- 进程切换
为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。
*这种行为被称为进程切换
, 这种切换是由操作系统来完成的。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。
从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1.保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。
注:总而言之就是很耗资源的
- 进程的阻塞
正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
- 文件描述符
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
import socket
print(socket.socket())
<socket.socket fd=172, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
- 缓存 I/O
缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。用户空间没法直接访问内核空间的,内核态到用户态的数据拷贝
缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。
3.常用IO模型
常用的IO模型:
- 1.阻塞IO
- 2.非阻塞IO
- 3.多路复用IO
- 4.异步IO
不常用:
- 驱动信号
1).阻塞IO( blocking IO )
在linux中,默认情况下所有的socket都是blocking,全程阻塞,不管是等待数据或者是从内核态拷贝数据到用户态,一个典型的读操作流程大概是这样:
系统调用两个阶段:
- wait for data 阻塞
- copy data 阻塞
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。
示例:
阻塞IO,就是简单的客户端和服务端进行交流,但是只能有一个客户端,再开一个客户端就不行了,来看代码的实现:
服务端:
import socket
sk = socket.socket()
print(sk)
sk.bind(("127.0.0.1",8080))
sk.listen(3)
while 1:
conn,addr = sk.accept()
while 1:
data = conn.recv(1024)
print(data.decode("utf8"))
conn.sendall(data)
客户端:
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",8080))
while 1:
inp = input(">>>")
sk.sendall(inp.encode("utf8"))
data = sk.recv(1024)
print(data.decode("utf8"))
2). 非阻塞IO(non-blocking )
linux下,可以通过设置socket使其变为non-blocking。setblocking(False) 设置阻塞状态为非阻塞固定时间循环发起系统调用,请求不到做自己的事情,等待下次请求,内核态拷贝数据到用户态需要等待。当对一个non-blocking socket执行读操作时,流程是这个样子:
系统调用两个阶段:
- wait for data 非阻塞
- copy data 阻塞
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
注意:
在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不一样,”非阻塞将大的整片时间的阻塞分成N多的小的阻塞, 所以进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是可以做其他事情的,也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
示例:
非阻塞IO 这里所举的例子都是以客户端和服务端为模板来写的,来看一个代码实现的例子,其实这个例子和上面的差不多就是在其中加了一些简单的代码,只有几行,可以看看差别就 改变了阻塞的方式:
server端
import socket
sk = socket.socket()
#print(sk)
sk.bind(("127.0.0.1",8080))
sk.listen(3)
sk.setblocking(False)
import time
while 1:
try:
conn,addr = sk.accept()
print(addr)
# while 1:
data = conn.recv(1024)
print(data.decode("utf8"))
#conn.sendall(data)
conn.close()
except Exception as e:
print("error:",e)
time.sleep(2)
client端
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",8080))
while 1:
#inp = input(">>>")
sk.sendall("hello".encode("utf8"))
data = sk.recv(1024)
print(data.decode("utf8"))
3). IO多路复用( IO multiplexing)
IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。 (全程阻塞,监听多个链接系统调用select完成wait for data工作)
它的流程如图:
系统调用两个阶段:
- wait for data 阻塞
- copy data 阻塞
特点:监听多个文件描述符,实现并发
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
示例:
- IO多路复用 最大的优势就是可以监听多个对象,就是通过select来监听*
select-server端
# import socket
# import select
#
# sk1=socket.socket()
# sk1.bind(('127.0.0.1',8080))
# sk1.listen(3)
#
# sk2=socket.socket()
# sk2.bind(('127.0.0.1',8081))
# sk2.listen(3)
#
# while 1:
# r,w,e=select.select([sk1,sk2],[],[])
# print('rrr')
# for obj in r:#[sk1,]
# conn,addr=obj.accept()
# print(addr)
# conn.send('i am server'.encode('utf8'))
select-client端
# import socket
# sk=socket.socket()
# sk.connect(('127.0.0.1', 8080))
#
# while 1:
# data = sk.recv(1024)
# print(data.decode('utf8'))
# inp=input('>>>')
# sk.sendall(inp.encode('utf8'))
#
import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
while True:
sk.connect(('127.0.0.1',8800))
print("hello")
sk.sendall(bytes("hello","utf8"))
time.sleep(2)
break
IO多路复用来实现一个简单的聊天
talk-server端:
import socket
import select
sk = socket.socket()
sk.bind(("127.0.0.1",8800))
sk.listen(5)
inp = [sk,]
while 1:
inputs,outputs,errors = select.select(inp,[],[],)
for obj in inputs: #conn
if obj == sk:
conn,addr = sk.accept()
print(conn)
inp.append(conn)
else:
data = obj.recv(1024)
print(data.decode("utf8"))
Inputs = input("回答%s>>>>"%inp.index(obj))
obj.sendall(Inputs.encode("utf8"))
talk-client端:
import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.connect(("127.0.0.1",8800))
while True:
# sk.connect(("127.0.0.1",8800))
#print("hello")
inp = input(">>>:")
sk.sendall(bytes(inp,"utf8"))
data = sk.recv(1024)
print(data.decode("utf8"))
# time.sleep(2)
# break
4). 异步IO(Asynchronous I/O)
linux下的asynchronous IO其实用得很少。全程无阻塞,实现复杂。先看一下它的流程:
Asynchronous IO.gif
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
5). 五种IO模型比较:
五种IO模型比较.png4. IO多路复用实现机制
IO多路复用机制:就是单个process可以同时处理多个网络连接的IO,基本原理就是通过select / epoll函数不断轮询所负责的所有socket,当某个socket有数据到达,就通知用户进程。
不同的操作系统提供的函数不同:
- windows系统: select
- linux系统: select、poll、epoll
1). 简单介绍select、poll、epoll三者的特点:
-
select的缺点有以下三点,会导致效率下降:
- 1.每次调用select都要将所有的fd(文件描述符),copy到你的内核空间
- 2.遍历所有的fd,是否有数据访问
- 3.最大连接数(1024),超出链接不再监听
-
poll:
- 与select一样,只是最大连接数没有限制
-
epoll不同于select和poll只有一个函数,epoll通过三个函数实现实现轮询socket:
- 1.第一个函数:创建epoll句柄:将所有的fd(文件描述符),copy到你的内核空间,只copy一次
- 2.回调函数:为所有fd绑定一个回调函数,一旦有数据访问,触发回调函数,回调函数将fd放入一个链表中(回调函数:某一个函数或者某一个动作,成功完成之后,会触发的函数)
- 3.第三个函数:判断链表是否为空
epoll最大连接数没有上线
2). select 介绍
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
3). selectors模块
selectors基于select模块实现IO多路复用,调用语句selectors.DefaultSelector(),特点是根据平台自动选择最佳IO多路复用机制,调用顺序:epoll > poll > select
import selectors
import socket
def accept(sock, mask):
conn, addr = sock.accept()
sel.register(conn, selectors.EVENT_READ, read) # 将conn和read函数注册到一起,当conn有变化时执行read函数
def read(conn, mask):
try:
data = conn.recv(1000)
print(data.decode('utf8'))
inputs = input('>>:').strip()
conn.send(inputs.encode('utf8'))
except Exception:
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('127.0.0.1', 8080))
sock.listen(100)
sock.setblocking(False) # 设置为非阻塞IO
sel = selectors.DefaultSelector() # 根据平台自动选择最佳IO多路复用机制
sel.register(sock, selectors.EVENT_READ, accept) # 将sock和accept函数注册到一起,当sock有变化时执行accept函数
while True:
events = sel.select() # 监听 [(key1,mask1),(key2),(mask2)]
for key, mask in events:
func = key.data # 1 key.data就是accept # 2 key.data就是read
obj = key.fileobj # 1 key.fileobj就是sock # 2 key.fileobj就是conn
func(obj, mask) # 1 accept(sock,mask) # 2read(conn,mask)
4). 队列queue
队列与线程(和进程)有关,保证多线程信息交换的安全。
队列是一种数据类型(数据结构),可用于存放数据创建队列语法queue.Queue(),默认是先进先出(FIFO)。
队列的优点:保证线程安全
- get与put方法
import queue
q = queue.Queue() #创建队列对象q
q.put(123) #将123放入队列中
q.put('hello')
q.get() #将第一个值从队列中取出
- join和task_done方法
join()阻塞进程,直到所有任务都完成,需要配合另一个方法task_done()。
task_done() 表示某个任务完成。每一条get语句后需要一条task_done。
import queue
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join()
print("ending!")
- 其他模式
- 先进后出:queue.LifoQueue()后进先出(LIFO)
- 优先级:queue.PriorityQueue()优先级高先出
q.put([1,‘123’]) #1为有限等级,越小越优先
十四. 生产者消费者模型
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <10:
print("making........")
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
#q.task_done()
#q.join()
print("ok......")
def Consumer(name):
count = 0
while count <10:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
#q.task_done()
#q.join()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
总结:
进程:最小的资源管理单位(盛放线程的容器)
线程:最小的执行单位
串行、并行、并发
cpython因为存在GIL导致,同一时刻,同一进程只能有一个线程执行
关于daemon:程序直到不存在非守护线程时退出
同步锁:由于多线程处理公共数据
递归锁
event:一个对象,让多个进程间通信
查看原文>>>>