Python 实现RabbitMQ 发布订阅消息队列
2021-03-30 本文已影响0人
夜空最亮的9星
消息生产(发布)者:
# publisher.py
import pika
class Publisher:
def __init__(self, config):
self.config = config
def publish(self, routing_key, message):
connection = self.create_connection()
channel = connection.channel()
channel.exchange_declare(exchange=self.config['exchange']
, exchange_type='topic')
channel.basic_publish(exchange=self.config['exchange']
, routing_key=routing_key, body=message)
print("[x] Sent message %r for %r" % (message, routing_key))
def create_connection(self):
param = pika.ConnectionParameters(host=self.config['host']
, port=self.config['port'])
return pika.BlockingConnection(param)
config = {'host': '192.168.121.83', 'port': 5672, 'exchange': 'my_exchange'}
publisher = Publisher(config)
while True:
message = input('please input message:')
publisher.publish('topic_02', message)
消息消费(订阅)者
import pika
import sys
# https://medium.com/@rahulsamant_2674/python-rabbitmq-8c1c3b79ab3d
class Subscriber:
def __init__(self, queueName, bindingKey, config):
self.queueName = queueName
self.bindingKey = bindingKey
self.config = config
self.connection = self._create_connection()
def __del__(self):
self.connection.close()
def _create_connection(self):
parameters = pika.ConnectionParameters(host=self.config['host'],
port=self.config['port'])
return pika.BlockingConnection(parameters)
def on_message_callback(self, channel, method, properties, body):
binding_key = method.routing_key
print("received new message for -" + binding_key)
print(" [x] Received %r" % body)
def setup(self):
channel = self.connection.channel()
channel.exchange_declare(exchange=self.config['exchange'],
exchange_type='topic')
channel.queue_declare(queue=self.queueName)
channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey)
channel.basic_consume(queue=self.queueName,
on_message_callback=self.on_message_callback, auto_ack=True)
print('[*] Waiting for data for ' + self.queueName + '. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
config = {'host': '192.168.121.83', 'port': 5672, 'exchange': 'my_exchange'}
subscriber = Subscriber('hello', 'topic_01', config)
subscriber.setup()