RabbitMQ基本用法、消息分发模式、消息持久化、广播模式

2020-04-20  本文已影响0人  Quillagua

RabbitMQ基本用法、消息分发模式、消息持久化、广播模式

RabbitMQ基本用法


示例代码

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#相当于建立一个socket,连接本地的RabbitMQ,默认端口:5672
channel = connection.channel()#声明一个通信管道(信道)

#在管道里什么一个queue
channel.queue_declare(queue='hello')#声明一个名称为hello的queue
#通过管道发送消息
channel.basic_publish(exchange='',
routing_key='hello',#queue的名字
body='Hellow Word!')#消息主体
connection.close()#关闭连接

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#不确定生产端或消费端谁先运行,因此为了避免错误,消费端也要申请一个queue
#消费端先运行,如果没申请这个queue,生产端还没建立这个queue,因此报错
channel.queue_declare(queue='hello')

def callback(ch,method,properties,body):
    #ch,管道(信道)channel的内存地址
    #method,设置的一些基本信息
    #properties,
    #body,消息主体,二进制数据
    print(ch,method,properties)
    print('[x] Received %r'%body)

#声明要收消息
channel.basic_consume(
callback,#如果收到消息就调用回调函数处理消息
queue='hello',#queue的名字
no_ack=True#不确认,是否处理完callback,给rabbitmq返回确认信息
)
#开始收消息
channel.start_consuming()#开启后一直收消息,没消息则卡住


消息分发

RabbitMQ消息分发(一对多)

no_act设置是否确认消息处理完


消息持久化

rabbitmq目录下启动cmd,命令:rabbitmqctl.bat list_queues查看当前queue列表

当我们需要消息不会丢失(RabbitMQ server宕机时),需要进行消息持久化

channel.basic_publish(
                        exchange='',
                        routing_key='hello',#queue的名字
                        body='Hellow Word!'
                        porperties=pika.BasicProperties(
                            delivery_mode=2#使队列里的消息持久化
                        )
)#消息主体


广播模式

消息公平分发


广播模式(消息是实时的,发送时没有启动接收端,消息丢失)


给所有bind此exchange的发送消息

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                        type='fanout')#广播模式,不用申明queue指定queue名
#设置exchange为fanout模式

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                        type='fanout')

result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue#拿到这个随机分配的queue名
channel.queue_bind(exchange='logs',#绑定发送端的exchange
                  queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()


有选择的广播(接受者过滤接收消息exchange type=direct)

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                        type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                        type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                      queue=queue_name,
                      routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()


细致的消息过滤()

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                        type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                        type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                      queue=queue_name,
                      routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

转自https://www.cnblogs.com/limich/p/7477200.html
我的GitHub

上一篇 下一篇

猜你喜欢

热点阅读