python 观察者模式
2020-09-14 本文已影响0人
假程序员
import abc
import threading
class Observer(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def work(self):
pass
class ClientA(Observer):
def work(self):
print(type(self))
class ClientB(Observer):
def work(self):
print(id(self))
class Subject(object):
def __init__(self):
self.observers = list()
self.lock = threading.RLock()
def register_observer(self, observer):
self.lock.acquire()
if observer not in self.observers:
self.observers.append(observer)
self.lock.release()
def un_register_observer(self, observer):
self.lock.acquire()
if observer in self.observers:
self.observers.remove(observer)
self.lock.release()
def notify(self):
self.lock.acquire()
for _ in self.observers:
_.work()
self.lock.release()
if __name__ == '__main__':
subject = Subject()
a = ClientA()
b = ClientB()
subject.register_observer(a)
subject.register_observer(b)
subject.notify()