Python之多线程、多进程
多线程
多线程的含义
一个进程可以创建多个线程进行并发计算,从而让进程同时处理多个计算逻辑,或者将一个大的计算任务拆分成多个小的计算任务,然后同时计算这些子任务。
通过查阅相关资料,其它某些语言的多线程可以调用多个CPU同时进行运算(并行),但是对于Python来说,它的C解释器设置了一个GIL(全局解释器锁:Global Interpreter Lock),使得同一时刻只有一个线程执行,也就是说,无论你怎么设置你的多线程,CPython的都只能利用一个CPU,这也就无法发挥计算机多核心的优势了。
由于我使用的是CPython,所以下面的Python都是指在C解释器下实现的python。
Python下多线程的作用
由于GIL带来的影响,计算密集型的程序中使用多线程不会带来性能提升,甚至由于线程切换需要耗费资源,它的性能还会下降。
但是Python的多线程依然有它的用武之地,在处理需要等待、需要大量使用慢速设备(内存和硬盘的性能相对于CPU来说非常慢,因此我将这种设备定义为慢速设备)等情况下,多线程仍然有不俗的表现。
如何规避GIL带来的影响呢?有以下几点建议:使用非C的解释器、使用C扩展进行数据运算、使用多进程 ...
编写多线程需要注意的问题
- 注意各个线程之间的通信问题,这包括是否共享对数据的修改,如何向其它线程发送信息,如何设置信号。
- 注意线程内部的资源使用,这包括了对临界资源的争抢,如何保持现有数据不被其它线程意外修改,如何对线程进行状态保存。
- 注意各个线程的调度,这包括了线程启动顺序,如何设置线程之间的同步操作,如何防止程序死锁的问题。
Python的多线程
线程的建立和启动
我们有两种方式建立多线程:直接构建线程,使用类继承构件线程
直接构建线程:
import time
from threading import Thread # 多线程最常使用的库
def t(n):
while n > 0:
print(n)
n -= 1
time.sleep(0.5)
if __name__ == '__main__':
a = Thread(target=t, args=(3,)) # 实例化一个线程
b = Thread(target=t, args=(3,))
c = Thread(target=t, args=(3,), daemon=True) # 实例一个守护线程,守护线程如果不自动退出,只会在程序结束时结束,因此它不可以使用jion
a.start() # 开启线程
b.start()
c.start()
a.join() # 等待线程执行完成,然后才能执行下面的代码
if a.is_alive(): # 判断线程是否在执行
print('a仍然执行')
if b.is_alive():
print('b仍然执行')
- join方法:等待一个线程执行完毕,若线程未执行,则一直阻塞等待
- 守护线程:不可以被join等待的线程,在主程序执行完毕之后会被立即关闭
用类继承:
from threading import Thread
import time
class CountdownThread(Thread): # 继承
def __init__(self, n):
super().__init__()
self.n = n
def run(self):
while self.n > 0:
print(self.n)
self.n -= 1
time.sleep(0.3)
c = CountdownThread(3)
c.start() # 会执行实例的run方法
d = CountdownThread(3)
d.start()
- 这种方法对类构建了一层继承关系,这可能会影响代码的灵活性,因此不提倡使用
进程间的信号传递
Event:全局信号
from threading import Thread, Event
import time
def t(x, e):
e.wait() # 等待信号
print(x)
if __name__ == '__main__':
event = Event() # 设置一个信号
t1 = Thread(target=t, args=('函数运行', event))
t1.start()
time.sleep(2)
event.set() # 传递出信号,所有event.wait()后面的代码开始执行
- 你可以使用 event.clear()移除信号,这样所有 event.wait()会回复阻塞
Condition:条件变量
https://www.jianshu.com/p/5d2579938517
进程间的数据共享
使用队列:
import threading
import time
import queue
def product():
while True:
if que.qsize() == 10:
print('现在有10个包子,我决定休息一会')
time.sleep(6) # 够10个包子就休息6秒
else:
que.put(1)
print('造包子,现在有{}个'.format(que.qsize()))
time.sleep(0.2) # 每0.2秒造一个包子
def consumer(name):
while True:
que.get()
print('{}吃了一个包子,还剩下{}个'.format(name, que.qsize()))
time.sleep(1.2) # 每1.2秒吃一个包子
que = queue.Queue() # 实例化一个可以在进程间共享的队列
p = threading.Thread(target=product)
c1 = threading.Thread(target=consumer, args=('选手1',))
c2 = threading.Thread(target=consumer, args=('选手2',))
p.start()
c1.start()
c2.start()
- .qsize() 获取队列元素的个数
- .get()从队列中获取数据
- .put()向队列中放入数据
- 在实例化一个队列的时候,可以使用maxsize设定队列的最大容量
- 队列空时使用get 和 队列满时使用put 都会引起队列的阻塞,知道其它地方放入/取走了数据
- 可以使用 timeout设置阻塞时间,阻塞超时会引发异常,并丢弃当请求(get 或 put)
- 使用block = False设置为非阻塞状态,遇到队列 空/满 的时候会直接抛出异常并丢弃请求
下面用一个实例解释一下队列中的 task_done 和 join:
import threading
import time
import queue
def product():
while True:
if que.qsize() == 10: # 队列中有10个包子的时候生产者退出生产
print('现在有10个包子,我回家了')
break
else:
que.put(1)
print('造包子,现在有{}个'.format(que.qsize()))
time.sleep(0.1) # 每0.2秒造一个包子
def consumer(name):
while True:
que.get()
print('{}吃了一个包子,还剩下{}个'.format(name, que.qsize()))
time.sleep(0.5) # 每1.2秒吃一个包子
que.task_done() # task_done 要和get配合使用,每进行一次get,就要用一次task_done
if que.qsize() == 0: # 队列为空时消费者推出消费
break
que = queue.Queue() # 实例化一个可以在进程间共享的队列
p = threading.Thread(target=product)
c1 = threading.Thread(target=consumer, args=('选手1',))
c2 = threading.Thread(target=consumer, args=('选手2',))
p.start()
c1.start()
c2.start()
que.join() # 保持阻塞,直到队列内部的计数器认为队列为空为止,注意队列计数器的规则:遇到put就+1,遇到task_done就-1
print('over')
- .join会阻塞程序,直到队列计数判断队列为空为止
- 计数加减的条件:遇到put 就 +1 ,遇到task_done 就 -1
- join不会判断队列中数据元素的个数,只会根据上面提到的计数条件判断。即使队列仍然有数据,但是join计数器为0时,就会放弃阻塞。
- 我们可以根据这个特性定制队列的阻塞条件。
锁
多线程的一大问题是多个线程争抢同一个资源,这样争抢的结果可能会引发错误:A线程正在使用资源S,S状态为S1,这时B线程修改了S,状态改为S2,如此一来,A线程的计算结果和S(修改后的状态为S2)不符。
为了解决这种问题,我们需要对临界资源上锁:保证A线程使用完S之后再允许其它线程修改S。
import threading
import time
n = 0#设置n为一个临界资源
n_lock = threading.Lock()#为n设置一把锁
def a(x):
n_lock.acquire()#上锁,上锁后,所有n_lock.acquire()都会被阻塞
time.sleep(3)#假设一个程序需要花3秒进行运算
global n
n += x
print(n)
n_lock.release()#开锁,释放资源
t1 = threading.Thread(target=a,args=(3,))
t2 = threading.Thread(target=a,args=(5,))
t1.start()
t2.start()
当然,伟大的同行已经为我们写好了Lock的上下文环境,所以你可以使用with达到与上面相同的结果:
import threading
import time
n = 0 # 设置n为一个临界资源
n_lock = threading.Lock() # 为n设置一把锁
def a(x):
with n_lock: # 自动用n_lock锁住下面的代码,当代码执行完毕后自动释放
time.sleep(3) # 假设一个程序需要花3秒进行运算
global n
n += x
print(n)
t1 = threading.Thread(target=a, args=(3,))
t2 = threading.Thread(target=a, args=(5,))
t1.start()
t2.start()
Lock是一把只能一个请求者使用的锁,我们可以使用Semaphore(信号量)设置使用数量:
from threading import Semaphore
import threading
import time
n_lock = Semaphore(2) # 允许两个人同时请求n_lock.acquire()
n = 0
def a(x):
with n_lock: # 自动用n_lock锁住下面的代码,当代码执行完毕后自动释放
time.sleep(3) # 假设一个程序需要花3秒进行运算
global n
n += x
print(n)
t1 = threading.Thread(target=a, args=(3,))
t2 = threading.Thread(target=a, args=(5,))
t3 = threading.Thread(target=a, args=(10,))
t1.start()
t2.start()
t3.start()
- threading库中还为我们提供了RLock( )和BoundedSemaphore( ),他们的功能和上面提到的Lock和Semaphore类似,但是有一些不同。
- Condition条件变量内部包含了资源锁和信号传递两个功能。
其实锁和信号量的作用远远不止于对临界资源的保护,他们还可以同步各个进程(线程)之间的进度。具体可以看一看操作系统的相关内容。
保存线程专有状态
如果你需要保存当前运行线程的专有状态,且这个状态对其它线程是不可见的,那就需要使用这个技巧。
import threading
t = threading.local()
def prin():
t.inner_id = threading.get_ident()
print(t.inner_id)
t1=threading.Thread(target=prin)
t2=threading.Thread(target=prin)
t3=threading.Thread(target=prin)
prin()
t1.start()
t2.start()
t3.start()
- 在底层,threading.local()实例为每个线程维护这一个单独的实例字典。所有对实例的常见操作,比如获取、设定以及删除,都是作用于每个线程专有的字典上。这使得不同线程的数据得到隔离。
创建线程池
可以使用threadin库提供的线程池,但是这里我选了另一个库的线程池,下面的代码是自动调用线程池,你可以使用pool.submit()手动为线程池调用代码。
from concurrent.futures import ThreadPoolExecutor # 引入线程池
import time
def t(n):
return n ** n ** n
if __name__ == '__main__':
start = time.time()
pool = ThreadPoolExecutor(max_workers=4) # 实例化一个线程池,最多使用4个线程
# with ThreadPoolExecutor(max_workers=4) as pool:#这行代码等价于上一行代码
r = pool.map(t, [6 for i in range(20000)]) # 使用这个线程池调用 t(6) 函数 20000次
pool.shutdown() # 等待所有线程执行完毕
print(time.time() - start) # 一个简单的计算
start = time.time()
for i in range(20000): # 这是直接使用单线程进行20000次 t(6)的计算
t(6)
print(time.time() - start)
############### 执行结果 ################
19.726083517074585
17.407222509384155
WTF???为什么线程并发耗费的时间反而比单线程多?
这就是最开始所讲到的,由于GIL的限制,Python只能发挥一个核心的性能,多线程只能发挥一个核心的性能,对于计算密集的程序,反而会因为线程切换增加计算成本。
那么如何破除这种困境呢?解决方案之一就是使用多进程。
多进程
多进程的原理和多线程类似,python通过创建多个进程进行计算,由于一个python进程有一个GIL锁,多进程就可以解除这样的限制。这使得Python可以利用到CPU的多个核心。
依然以上一个例子作为对比:
from concurrent.futures import ProcessPoolExecutor # 引入进程池
import time
def t(n):
return n ** n ** n
if __name__ == '__main__':
start = time.time()
pool = ProcessPoolExecutor(max_workers=8) # 实例化一个进程池
# with ProcessPoolExecutor(4) as pool:#这行代码等价于上一行代码
r = pool.map(t, [6 for i in range(20000)]) # 使用这个进程池调用 t(6) 函数 20000次
pool.shutdown() # 等待所有进程执行完毕
print(time.time() - start) # 一个简单的计算
start = time.time()
for i in range(20000): # 这是直接使用单线程进行20000次 t(6)的计算
t(6)
print(time.time() - start)
############### 执行结果 ################
10.398910760879517
17.622284173965454
我的计算机是8核心,所以我将进程池上限设置为8,你会发现,使用进程池的运行速度比线程池快了将近一倍,这比你想象中的要慢?有以下几点原因:
- 创建进程是非常耗费资源的,上面代码在后台加载了共8个python程序,这需要花费大量时间。
- 进程间的通信非常困难,尽管伟大的同行们已经构建了进程通信工具,但是其内部都是使用pickle编码传递信息,相比进程通信,这很低效。
除此之外,多进程编码还有一些你需要注意的地方: - 这种技术只适用于可以将问题直接分解为多个独立子问题的情况。
- 多进程的任务只能使用函数提交,其它类型的可调用对象(实例的方法,类的方法,lambda等)都不支持并行处理。
- 并行的函数的参数和返回值必须兼容pickle编码,否则无法进行进程间传递。
- 进程建立后就很难控制其行为,因此“大量计算,行为简单”的任务是多进程并行的首选。
- 多进程和多线程结合需要格外小心,一般我们先加载多进程,然后使用多线程。
多进程的编码工具和多线程的工具十分相似,我看到有个小伙伴写的还不错,点这里即可查看。(实际上是我懒)
最后附一份思维导图
并发