Python多线程
进程间同步互斥方法lock
from multiprocessing import Lock
创建 进程锁对象
lock = Lock()
相关方法:
lock.acquire() 给临界区上锁
lock.release() 给临界区解锁
说明:
具体实现上 acquire() 为一个条件阻塞函数,当有任意一个进程先进行了acquire操作后,其他进程再企图进行acquire操作时就会阻塞,直到lock对象被 release 后其他进程才可进行下次acquire操作
with lock: 也可以实现加锁、解锁,类似于with open('directory') as f: 具体如下代码
示例:
from multiprocessing import Process,Lock
import time,sys
def worker1(stream):
lock.acquire() # 加锁
for i in range(5):
time.sleep(1)
stream.write("Lock acquired via\n")
lock.release()#解锁
def worker2(stream):
# lock.acquire()
with lock: #加锁 语句块结束即解锁
for i in range(5):
time.sleep(1)
stream.write("Lock acquired directly\n")
# lock.release()
lock = Lock()
#sys.stdout为所有进程都拥有的资源
w1 = Process(target=worker1,args=(sys.stdout,))
w2 = Process(target=worker2,args=(sys.stdout,))
w1.start()
w2.start()
w1.join()
w2.join()
线程
- 线程也可以使用计算机的多核资源,也是多任务编程方式之一
- 线程又称为轻量级的进程,在并发上和进程相同但是在创建时消耗资源少
一个进程中可以包含多个线程,这多个线程共享进程的资源
多个线程因为共享进程的资源所以在通信上往往采用全局变量的方法
线程也有自己特有的资源,比如 TID 指令集等
多进程和多线程的区别和联系
- 多进程和多线程都是多任务编程方式,都可以使用计算机多核
- 进程的创建要比线程消耗更多的资源
- 进程空间独立数据更安全,有专门的进程间通信方式进行交互
- 一个进程包含多个线程,所以线程共享进程资源,没有专门的通信方法,依赖全局量进行通信。往往需要使用同步互斥机制,逻辑需要考虑更多
- 进程线程都有自己特有的资源。多个关联任务的时候使用多线程资源消耗更少,如果是多个无关任务也不适于全都使用线程
创建线程
import threading
**创建线程函数 **
threading.Tread(name,target,args,kwargs)
功能 : 创建线程
参数 :
- [name] 线程名称(默认Thread-1)
- [target] 线程函数
- [args] 以元组方式给线程函数传参
- [kwargs] 以字典方式给线程函数传参
返回值 : 返回线程对象
线程属性和方法
t.start() #启动一个线程
t.is_alive() #查看一个线程的状态
t.name #查看线程的名称
t.join([sec]) #阻塞等待回收线程,有数值参数,默认一直阻塞
daemon 属性
- 设置该属性默认为False,主线程执行完毕不会影响其他线程的执行
- 如果设置为True 则主线程执行完毕其他线程也终止执行
设置方法:
t.setDaemon(True)
#或,二选一
t.daemon = True
获取daemon属性值
t.isDaemon()
示例:
import threading
from time import sleep,ctime
def fun():
print(1,'this is a thread test')
sleep(5)
print('thread over')
t = threading.Thread(name='mmp',target= fun)
#设置主进程结束后是否停止子线程,True为结束,默认不结束
t.setDaemon(True)
t.start()
print(2,t.is_alive())
print(3,t.name)
t.join(2)
print('all over',ctime())
线程间的通信
主要: 使用全局变量进行通信
全局变量示例:
import threading
from time import sleep,ctime
s = None
def bar():
print('呼叫foo')
global s
s ='天王地虎'
def foo():
print('foo等口令')
sleep(2)
print('foo收到 %s'%s )
def fun():
sleep(1)
print('内奸出现')
global s
s ='小鸡炖蘑菇'
t1 = threading.Thread(name='bar',target=bar)
t2 = threading.Thread(name='foo',target=foo)
t3 = threading.Thread(name='fun',target=fun)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
线程间的同步和互斥
线程 event
同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
Event几种方法:
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
更多参考链接:https://www.cnblogs.com/shuopython/p/11942122.html
from threading import Thread,Event
import time
event=Event()
def light():
print('红灯正亮着')
time.sleep(3)
event.set() #绿灯亮
def car(name):
print('车%s正在等绿灯' %name)
event.wait() #等灯绿 此时event为False,直到event.set()将其值设置为True,才会继续运行.
print('车%s通行' %name)
if __name__ == '__main__':
# 红绿灯
t1=Thread(target=light)
t1.start()
# 车
for i in range(10):
t=Thread(target=car,args=(i,))
t.start()
e = threading.Event() #创建事件对象
e.wait([timeout]) #如果e被设置则不会阻塞,未被设置则阻塞,timeout为阻塞的超时时间
e.set() #将e变为设置的状态
e.clear() #将e变为未设置的状态
示例1:
from threading import *
import random
from time import sleep
a = 500
#创建事件对象
e = Event()
#子线程不断减少a 但是希望a的值不会少于100
def fun():
global a
while True:
sleep(2)
print('a = ',a)
e.wait() #
a -= random.randint(0,100)
t = Thread(target = fun)
t.start()
#主线程不断的让a增加以确保a不会小于100
while True:
sleep(1)
a += random.randint(1,10)
if a > 100:
e.set()
else:
e.clear()
t.join()
示例2:
import threading
from time import sleep
def test(n, event):
while not event.isSet():
print('Thread %s is ready' , n)
sleep(1)
event.wait()
while event.isSet():
print('Thread %s is running' , n)
sleep(1)
def main():
event = threading.Event()
for i in range(0, 2):
th = threading.Thread(target=test, args=(i, event))
th.start()
sleep(3)
print('----- event is set -----')
event.set()
sleep(3)
print('----- event is clear -----')
event.clear()
if __name__ == '__main__':
main()
线程锁
lock = threading.Lock() #创建锁对象
lock.acquire() #上锁
lock.release() #解锁
示例:
a = b = 0
lock = threading.Lock()
def value():
while True:
lock.acquire()#上锁
if a != b:
print('a = %d,b = %d'%(a,b))
lock.release()#解锁
t = threading.Thread(target=value)
t.start()
while True:
lock.acquire()#上锁
a += 1
b += 1
lock.release()#解锁
t.join()
创建自己的线程类
-
自定义类 继承于 原有线程类 Thread
-
复写原有的run方法
-
创建线程对象调用start 的时候会自动执行run
线程池1
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
lock = Lock()
def f(x):
x**x
lock.acquire()
print('{:<20d}----------------'.format(x))
lock.release()
pool = ThreadPool(processes=100) # 线程数量
result = pool.map(f, range(10000))
pool.close()
pool.join()
线程池获取返回值示例:
from multiprocessing import Pool
import time
def func(i): #返回值只有进程池才有,父子进程没有返回值
time.sleep(0.5)
return i*i
if __name__ == '__main__':
p = Pool(5)
res_l = [] #从异步提交任务获取结果
for i in range(10):
res = p.apply_async(func,args=(i,)) #apply_sync的结果就是异步获取func的返回值
res_l.append(res) #从异步提交任务获取结果
for res in res_l: print(res.get()) #等着func的计算结果
# p.close() #关闭进程池事件加入通道,即不能再向进程池中加入事件
# p.join() #阻塞等等进程池处理事件结束后回收进程池
参考链接:https://blog.csdn.net/u012969412/article/details/82768882
线程池2
concurrent.futures
1.ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。
2.使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
3.通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。下面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。
4.使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。
5.使用result()方法可以获取任务的返回值,注意:这个方法是阻塞的。
参考链接:https://www.cnblogs.com/Nicholas0707/p/11768462.html
GIL (全局解释器锁)
python -- 》 支持多线程 ----》 同步和互斥 ---》加锁 ---》超级锁 ----》 解释器在同一时刻只能解释一个线程
大量python库为了省事依赖于这种机制 ---》 python多线程效率低
GIL 即为cpython 解释器由于上锁带了的同一时刻只能解释一个线程的问题
解决方案 :
- 不使用线程,转而使用进程
- 不使用c作为解释器,使用java,c#都可以做python解释器
IO 密集型 程序中进行了大量IO操作,只有少量的CPU操作
在内存中进行了数据的交换的操作都可以认为是IO操作
特点 : 速度较慢,使用cpu不高
cpu密集型(计算密集型):大量的程序都在进行运算操作
特点 : cup占有率高
效率测试:
Line cpu 1.224205732345581
Line IO 4.142379522323608
Thread cpu 0.7009162902832031
Thread IO 3.458016872406006
Process cpu 0.6419346332550049
Process IO 1.8482108116149902
- 多线程的工作效率和单线程几乎相近,而多进程要比前两者有明显的效率提升
线程示例:
from test import *
import threading
import time
counts = []
t = time.time()
for x in range(10):
th = threading.Thread\
(target = count,args = (1,1))
th.start()
counts.append(th)
n = len(counts)
while True:
for th in counts:
if not th.is_alive():
n -= 1
if n <= 0:
break
print("Thread cpu",time.time()-t)
设计模式
设计模式代表了一种最佳实践,是被开发人员长期开发总结,用来解决某一类问题的思路方法。这些方法保证了代码的效率也易于理解。
单例模式 工厂模式 生产者模式。。。。
生产者消费者模式
高内聚 : 在同一模块内,实现单一功能,尽量不使功能混杂
低耦合 : 不同的模块之间尽量相互独立,减少模块间的影响
示例:
from threading import Thread
#python标准库中的队列模块
import queue
import time
#创建一个队列模型作为商品的仓库
q = queue.Queue()
class Producer(Thread):
def run(self):
count = 0
while True:
if q.qsize() < 50:
for i in range(3):
count += 1
msg = "产品 %d"%count
q.put(msg) #将产品放入队列
time.sleep(1)
class Customer(Thread):
def run(self):
while True:
if q.qsize() > 20:
for i in range(2):
msg = q.get() #从仓库拿到商品
print("消费了一个 %s"%msg)
time.sleep(1)
#创建三个生产者
for i in range(3):
p = Producer()
p.start()
#创建5个消费者
for i in range(5):
p = Customer()
p.start()