38.2-RLock和Condition
大张旗鼓的离开其实都是试探,真正的离开是没有告别的!
参考:
第32天:Python logging 模块详解 ([Python技术])
总结:
- 锁的操作要看属主是 哪个线程;锁的操作(获得、释放)在同一个线程间操作;
- Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题;
并发解决问题的时候,大家都在争夺同一个资源,我们就需要去使用lock;是概要更高的效率,还是更精确的结果呢?
1. 可重入锁RLock-递归锁
可重入锁,是线程相关的锁。
RLock内部维护着一个Lock和一个counter变量
counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
from threading import Thread,Lock,RLock
import time
lock = threading.RLock()
print(lock.acquire())
print('------------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
print(lock)
#print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
lock.release()
print(lock)
#----------------------------------------------------------------------------------------------
<locked _thread.RLock object owner=17876 count=5 at 0x000002406C09E080>
<unlocked _thread.RLock object owner=0 count=0 at 0x000002406C09E080>
可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞获取锁。当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。
2. Condition
构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认是RLock。
名称 | 含义 |
---|---|
acquire(*args) | 获取锁 |
wait(self, timeout=None) | 等待或超时 |
notify(n=1) | 唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作 |
notify_all() | 唤醒所有等待的线程 |
Condition用于生产者、消费者模型;为了解决生产者消费者速度匹配的问题;
生产者消费速度 大于 生产者生产速度;
import threading,time,logging,random
from threading import Thread, Lock,RLock,Event
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是为了演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是为了使用方便,与逻辑无关
def produce(self, total):
for _ in range(total):
data = random.randint(0, 100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
data = self.data
logging.info("recieved {}".format(data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()
#----------------------------------------------------------
2020-01-14 16:08:21,437 consumer 11948 recieved None
2020-01-14 16:08:21,437 producer 22512 82
2020-01-14 16:08:21,937 consumer 11948 recieved 82
2020-01-14 16:08:22,437 producer 22512 64
2020-01-14 16:08:22,438 consumer 11948 recieved 64
2020-01-14 16:08:22,939 consumer 11948 recieved None
2020-01-14 16:08:23,437 producer 22512 71
2020-01-14 16:08:23,439 consumer 11948 recieved 71
2020-01-14 16:08:23,940 consumer 11948 recieved None
2020-01-14 16:08:24,438 producer 22512 36
2020-01-14 16:08:24,441 consumer 11948 recieved 36
2020-01-14 16:08:24,941 consumer 11948 recieved None
2020-01-14 16:08:25,438 producer 22512 78
2020-01-14 16:08:25,442 consumer 11948 recieved 78
2020-01-14 16:08:25,943 consumer 11948 recieved None
2020-01-14 16:08:26,438 producer 22512 91
2020-01-14 16:08:26,443 consumer 11948 recieved 91
2020-01-14 16:08:26,944 consumer 11948 recieved None
2020-01-14 16:08:27,439 producer 22512 5
2020-01-14 16:08:27,445 consumer 11948 recieved 5
2020-01-14 16:08:27,945 consumer 11948 recieved None
2020-01-14 16:08:28,439 producer 22512 86
2020-01-14 16:08:28,446 consumer 11948 recieved 86
2020-01-14 16:08:28,947 consumer 11948 recieved None
2020-01-14 16:08:29,439 producer 22512 16
2020-01-14 16:08:29,447 consumer 11948 recieved 16
2020-01-14 16:08:29,948 consumer 11948 recieved None
2020-01-14 16:08:30,440 producer 22512 50
2020-01-14 16:08:30,449 consumer 11948 recieved 50
2020-01-14 16:08:30,949 consumer 11948 recieved None
这个例子采用了消费者主动消费,消费者浪费了大量时间,主动来查看有没有数据。
能否换成一种通知机制,有数据通知消费者来消费呢?
使用Condition对象。
上例中,消费者等待数据等待,如果生产者准备好了会通知消费者消费,省得消费者反复来查看数据是否就绪。
如果是1个生产者,多个消费者怎么改
import threading,time,logging,random
from threading import Thread, Lock,RLock,Event,Condition
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是为了演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是为了使用方便,与逻辑无关
self.cond = Condition()
def produce(self, total):
for _ in range(total):
data = random.randint(0, 100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait()
logging.info("recieved {}".format(self.data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()
#--------------------------------------------------------------
2020-01-14 16:20:33,578 producer 21688 99
2020-01-14 16:20:33,578 consumer 15252 recieved 99
2020-01-14 16:20:34,579 producer 21688 100
2020-01-14 16:20:34,579 consumer 15252 recieved 100
2020-01-14 16:20:35,580 producer 21688 61
2020-01-14 16:20:35,580 consumer 15252 recieved 61
2020-01-14 16:20:36,580 producer 21688 32
2020-01-14 16:20:36,580 consumer 15252 recieved 32
2020-01-14 16:20:37,581 producer 21688 32
2020-01-14 16:20:37,581 consumer 15252 recieved 32
2020-01-14 16:20:38,581 producer 21688 67
2020-01-14 16:20:38,581 consumer 15252 recieved 67
2020-01-14 16:20:39,581 producer 21688 85
2020-01-14 16:20:39,581 consumer 15252 recieved 85
2020-01-14 16:20:40,582 producer 21688 41
2020-01-14 16:20:40,582 consumer 15252 recieved 41
2020-01-14 16:20:41,582 producer 21688 26
2020-01-14 16:20:41,582 consumer 15252 recieved 26
2020-01-14 16:20:42,583 producer 21688 78
2020-01-14 16:20:42,583 consumer 15252 recieved 78
self.cond.notify_all() # 发通知
修改为self.cond.notify(n=2)
试一试看看结果?
这个例子,可以看到实现了消息的 一对多 ,这其实就是 广播 模式。
注:上例中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解Condition的使用,和生产者消费者模型。
Condition总结
Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。
采用了通知机制,非常有效率。
使用方式
使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是>>使用with上下文。
消费者wait,等待通知。
生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。