Python 实现RabbitMQ 发布订阅消息队列

2021-03-30  本文已影响0人  夜空最亮的9星

消息生产(发布)者:

# publisher.py
import pika


class Publisher:

    def __init__(self, config):
        self.config = config

    def publish(self, routing_key, message):
        connection = self.create_connection()
        channel = connection.channel()
        channel.exchange_declare(exchange=self.config['exchange']
                                 , exchange_type='topic')
        channel.basic_publish(exchange=self.config['exchange']
                              , routing_key=routing_key, body=message)

        print("[x] Sent message %r for %r" % (message, routing_key))

    def create_connection(self):
        param = pika.ConnectionParameters(host=self.config['host']
                                          , port=self.config['port'])
        return pika.BlockingConnection(param)


config = {'host': '192.168.121.83', 'port': 5672, 'exchange': 'my_exchange'}
publisher = Publisher(config)


while True:
    message = input('please input message:')
    publisher.publish('topic_02', message)

消息消费(订阅)者

import pika
import sys


# https://medium.com/@rahulsamant_2674/python-rabbitmq-8c1c3b79ab3d
class Subscriber:
    def __init__(self, queueName, bindingKey, config):
        self.queueName = queueName
        self.bindingKey = bindingKey
        self.config = config
        self.connection = self._create_connection()

    def __del__(self):
        self.connection.close()

    def _create_connection(self):
        parameters = pika.ConnectionParameters(host=self.config['host'],
                                               port=self.config['port'])
        return pika.BlockingConnection(parameters)

    def on_message_callback(self, channel, method, properties, body):
        binding_key = method.routing_key

        print("received new message for -" + binding_key)
        print(" [x] Received %r" % body)


    def setup(self):
        channel = self.connection.channel()
        channel.exchange_declare(exchange=self.config['exchange'],
                                 exchange_type='topic')
        channel.queue_declare(queue=self.queueName)
        channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey)
        channel.basic_consume(queue=self.queueName,
                              on_message_callback=self.on_message_callback, auto_ack=True)
        print('[*] Waiting for data for ' + self.queueName + '. To exit press CTRL+C')
        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.stop_consuming()


config = {'host': '192.168.121.83', 'port': 5672, 'exchange': 'my_exchange'}

subscriber = Subscriber('hello', 'topic_01', config)
subscriber.setup()

上一篇下一篇

猜你喜欢

热点阅读