blinker信号

2020-04-08  本文已影响0人  huashen_9126

安装

pip install blinker

订阅信号connect

from blinker import signal

def subscriber(sender):
    print("Got a signal sent by %r" % sender)

ready = signal('ready')
ready.connect(subscriber)

触发信号send

from blinker import signal

def subscriber(sender):
    print("Got a signal sent by %r" % sender)

def b_subscriber(sender):
    print("Caught signal from processor_b.")
    assert sender.name == 'b'

ready = signal('ready')
ready.connect(subscriber)


class Processor:
   def __init__(self, name):
       self.name = name

   def go(self):
       ready = signal('ready')
       ready.send(self)
       print("Processing.")
       complete = signal('complete')
       complete.send(self)

   def __repr__(self):
       return '<Processor %s>' % self.name

processor_a = Processor('a')
processor_b = Processor('b')
#订阅特定的发布者
ready.connect(b_subscriber, sender=processor_b)
processor_a.go()
print('------------------------')
processor_b.go()
#结果:
Got a signal sent by <Processor a>
Processing.
------------------------
Got a signal sent by <Processor b>
Caught signal from processor_b.
Processing.

通过信号收发数据+使用装饰器订阅信号

from blinker import signal

send_data = signal('send-data')

@send_data.connect
def receive_data(sender, **kw):
    print("Caught signal from %r, data %r" % (sender, kw))
    return 'received!'

result = send_data.send('anonymous', abc=123)
#结果:
Caught signal from 'anonymous', data {'abc': 123}

参考:https://www.jianshu.com/p/d2f2cfd1b140

上一篇 下一篇

猜你喜欢

热点阅读