Python 操作 RabbitMQ

2023-02-19  本文已影响0人  木叶苍蓝

实现简单的消息队列

一个 Product 向 queue 发送一个 message , 一个 Client 从该 queue 接收 message 并打印。


952555-20160729110926294-627960883.png
发送消息 Product
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))  #  定义连接池
channel = connection.channel()
channel.queue_declare(queue='test')  # 声明队列向其发送消息
channel.basic_publish(exchange='', routing_key='test', body='Hello World!')  # 注意当前未定义 exchange ,routing_key 需和 queue 保持一致
print("send success msg to rabbitmq")
connection.close()  # 关闭连接
接收消息 Client
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))
channel = connection.channel()
channel.queue_declare(queue='test')

def callback(ch, method, properties, body):
    ''' 回调函数,处理从 rabbitmq 中取出的消息 '''
    print(" [x] Received %r" % body)

channel.basic_consume(callback, queue='test', no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始建监听 接收消息
执行结果
#product端:
send success msg to rabbitmq

#client端:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'
消息确认

当客户端从队列中取出消息之后,可以需要一段时间才能处理完成。如果在这个过程中,客户端出错了,异常退出了,而数据还没有处理完成,那么非常不幸这段数据就丢失了,因为 RabbitMQ 默认会把此消息标记为已完成,然后从队列中移除,消息确认是客户端从 RabbitMQ 中取出消息,并处理完成之后,会发送一个 ack 告诉 RabbitMQ ,消息处理完成。当 RabbitMQ 收到客户端的获取消息请求之后,标记为处理中,当再次收到 ack 之后,才会标记为已完成,然后从队列中删除。当 RabbitMQ 检测到客户端和自己断开链接之后,还没有收到 ack ,则会 重新将消息放回消息队列,交给下一个客户端处理,保证消息不丢失,也就是说,RabbitMQ 给了客户端足够长的时间来做数据处理。
在客户端使用 no_ack 来标记是否需要发送 ack,默认是 False 开启状态。
product 向 rabbitmq 发送两条消息

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, ))  # 定义连接池
channel = connection.channel()

channel.queue_declare(queue='test')
channel.basio_publish(exchange='', routing_key='test', body='1')
channel.basio_publish(exchange='', routing_key='test', body='2')
print('send success msg to rabbitmq!')
connection.close()

客户端接收消息,不发送 ack

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))
channel = connection.channel()
channel.queue_declare(queue='test')

def callback(ch, method, properties, body):
    ''' 回调函数,处理从 rabbitmq 中取出的消息'''
    print("  [x] Received %r" % body)
    time.sleep(5)
    # ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack 消息

channel.basic_consume(callback, queue='test', no_ack=False)
print(' [*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()  # 开始监听,接收消息

执行结果,发现消息并没有从队列中删除

第一次执行:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'2'
第二次执行:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'2'

加入 ack 之后

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))
channel = connection.channel()
channel.queue_declare(queue='test')

def callback(ch, methond, properties, body):
    ''' 回调函数 '''
    print(" [x] Received %r " % body)
    time.sleep(5)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack 消息

channel.basic_consume(callback, queue='test', no_ack=False)
print(' [*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()  # 开始监听 接收消息

运行结果,发现第二次运行队列中已经没有消息

第一次:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'2'
 [x] Received b'3
 第二次:
  [*] Waiting for messages. To exit press CTRL+C

消息持久化

消息持久化,消息确认机制使得客户端在崩溃的时候,服务端消息不丢失。但是如果 rabbitmq 崩溃了呢?该如何保证队列中的消息不丢失?此就需要 product 在往队列中 push 消息的时候,告诉 rabbitmq ,此队列中的消息需要持久化,用到的参数:durable = True ,再次强调 producer 和 client 都应该去创建这个 queue ,尽管只有一个地方的创建是真正起作用的:

channel.basic_publish(exchange='', 
                routing_key='test', 
                body=message, 
                properties=pika.BasicProperties(
                    delivery_mode = 2,  # make message persistent
                ))

product 端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='127.0.0.1', port=5672))   # 定义连接池
channel = connection.channel()

channel.queue_declare(queue='test_persistent', durable=True)
for i in range(10):
    channel.basic_publish(exchange='', routing_key='test_persistent', body=str(i), properties=pika.BasicProperties(delivery_mode=2))
    print('send success msg [%s] to rabbitmq' % i)
connection.close()

client 端:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test_persistent', durable=True)

def callback(ch, method, properties, body):
    ''' 回调函数 '''
    print(" [x] Received %r ' % body)
    # time.sleep(5)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack 消息

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='test_persistent', no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息
注意:client端也需配置durable=True,否则将报下面错误:

pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - parameters for queue 'test_persistent' in vhost '/' not equivalent")

使用 Exchange

exchange 主要负责从 product 那里接受 push 消息,根据 product 定义的规则,投递到 queue 中,是 product 和 queue 的中间件


952555-20160729110926419-1937096317.jpg

Exchange 类型

使用 fanout 实现发布订阅者模型


952555-20160729110926778-1544074422.jpg

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ 实现发布和订阅时会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

订阅者

import pika
import time

connection = pika.BlockConnection(pika.ConnectionParameters(
        host='127.0.0.1', port=5672))    
channel = connection.channel()

channel.exchange_declare(exchange='test123', type='fanout')  # 定义一个 exchange 类型为 fanout
rest = channel.queue_declare(exclusive=True)  # 创建一个随机队列,并启用 exchange
queue_name = rest.method.queue  # 获取队列名称
channel.queue_bind(exchange='test123', queue=queue_name)  # 将随机队列名和 exchange 进行绑定

def callback(ch, method, properties, body):
    '''  回调函数 '''
    print("  [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack 消息

channel.basic_qos(preferch_count = 1)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()  # 开始监听,接收消息

发布者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test123', type='fanout')
for i in range(10):
    channel.basic_publish(exchange='test123', routing_key='', body=str(i), properties=pika.BasicProperties(delivert_mode=2))
    print("send success mes [%s] to rabbitmq" % i)

connection.close()

注意:需要先定义订阅者,启动订阅者,否则发布者 publish 到一个不存在的 exchange 是被禁止的,如果没有 queue bindings exchange 的话,msg 是被丢弃的

使用 direct 实现根据关键字发布消息
952555-20160729110926653-1969440238.jpg

消息发布订阅者模型是发布者发布一条消息,所有订阅者都可以收到,现在 rabbitmq 还支持根据关键字发送,在发生消息的时候使用 routing_key 参数指定关键字,rabbitmq 的 exchange 会判断 routing_key 的值,然后只将消息转发至匹配的队里,注意,此时需要订阅者先创建队列
配置参数为 exchange 的 type = direct ,然后定义 routing_key 即可
订阅者1:订阅 error warning info

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test321', type='direct')   # 定义一个 exchange ,类型为 direct
rest = channel.queue_declare(exclusive = True)  # 创建一个随机队列,并启用 exchange 
queue_name = rest.method.queue

severities = ['error', 'warning', 'info']  # 定义三个routing_key

for severity in severities:
    channel.queue_bind(exchange='test321', routing_key=severity, queue=queue_name)

def  callback(ch, method, properties, body):
    ''' 回调函数 '''
    print( " [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack 消息

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()   # 开始监听,接收消息

订阅者2:订阅 error,warning

import pika
import time

connection = pika.BlockConnection(pika.ConnectionParameters(
                        host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test321', type='direct')
rest = channel.queue_declare(exclusive=True)
queue_name = rest.method.queue

severities = ['error', 'warning']

for severity in severities:
    channel.queue_bind(exchange='test321', routing_key=severity, queue=queue_name)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack 消息

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

发布者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定义连接池
channel = connection.channel()          #声明队列以向其发送消息消息

channel.exchange_declare(exchange='test321',type='direct')
channel.basic_publish(exchange='test321', routing_key='info', body='info msg',properties=pika.BasicProperties(delivery_mode=2))  #发送info msg到 info routing_key
channel.basic_publish(exchange='test321', routing_key='error', body='error msg',properties=pika.BasicProperties(delivery_mode=2)) #发送error msg到 error routing_key

print('send success msg[] to rabbitmq')
connection.close()   #关闭连接**

效果:发现订阅者1和订阅者2都收到error消息,但是只有订阅者1收到了info消息

订阅者1:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'info msg'
 [x] Received b'error msg'
订阅者2:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'error msg'

上一篇 下一篇

猜你喜欢

热点阅读