MQpython开发

RabbitMQ pika简单使用

2017-10-06  本文已影响47人  宝宝家的隔壁老王

MQ 全称为 Message Queue, 是一种应用程序对应用程序的通信方法。MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 遵循了 AMQP 协议的具体实现和产品。

整篇代码撸下来预计耗时1.5h。

参考博主:anzhsoft
官网


闲聊

一、What RabbitMQ?

MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 和 JMS 类似,但不同的是 JMS 是 SUN JAVA 消息中间件服务的一个标准和API定义,而 MQ 则是遵循了 AMQP协议的具体实现和产品。

这个系统架构图版权属于sunjun041640
二、Why RabbitMQ

对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块如何通信呢?可以使用 socket,或者 url 请求,但是还有很多问题需要解决,如:

三、How RabbitMQ

一套 MQ 完整流程如下:

首先将 RabbitMQ 服务启动

Producer

Consumer

四、example

producer.py

import pika  

if __name__ == '__main__':
    # 创建一个connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()
    
    # 声明一个queue
    channel.queue_declare(queue='hello')  
    
    # exchange为空的时候,routing_key就是指定的queue值
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')  
    print(" [x] Sent 'Hello World!'")
    # 关闭连接
    connection.close()

consumer.py

import pika  

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % (body,))

if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    # 创建频道
    channel = connection.channel()  
    # 声明queue
    channel.queue_declare(queue='hello')  
    
    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 收到指定消息的回调设置 
    channel.basic_consume(callback, queue='hello', no_ack=True)  
    # 开始循环监听 
    channel.start_consuming()

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    # 创建一个 connection
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    # 创建一个 channel
    channel = connection.channel()
    # 声明一个queue
    channel.queue_declare(queue='hello')

    # 发布消息,exchange 为空的情况下,routing_key 值就是指定的 queue 名字,即将消息直接发送到指定的 queue
    channel.basic_publish(exchange='', routing_key='hello', body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    time.sleep(str(body).count('.'))
    ch.basic_ack(delivery_tag = method.delivery_tag)
        
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    channel.queue_declare(queue='hello')  
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
      
    channel.basic_consume(callback, queue='hello',no_ack=False)  
    
    channel.start_consuming()

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    channel.basic_publish(exchange='logs', routing_key='', body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(str(body).count('.'))
      
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    # 声明一个 exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    # 声明一个随机名字的 queue
    result = channel.queue_declare()  
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 获取 queue 的 name 
    queue_name = result.method.queue
    # 将 queue 绑定到 exchange
    channel.queue_bind(exchange='logs', queue=queue_name)
    # 设置监听的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    # 声明一个 direct 类型的 exchange
    channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
    # 获取 routing_key 值
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    channel.basic_publish(exchange='logs_direct', routing_key=severity, body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import sys
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 确认消息
    ch.basic_ack(delivery_tag = method.delivery_tag)
    time.sleep(str(body).count('.'))
      
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    # 声明一个 exchange
    channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
    # 声明一个随机名字的 queue
    result = channel.queue_declare()  

    # 设置监听的 routing_key
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 获取 queue 的 name 
    queue_name = result.method.queue
    # 将 queue 绑定到 exchange
    channel.queue_bind(exchange='logs_direct', queue=queue_name, routing_key=severity)
    # 设置监听的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    # 声明一个 topic 类型的 exchange
    channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    # 获取 routing_key 值
    severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
    channel.basic_publish(exchange='logs_topic', routing_key=severity, body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import sys
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(str(body).count('.'))
  
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()

    # 声明一个 exchange
    channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    # 声明一个随机名字的 queue
    result = channel.queue_declare()  

    # 设置监听的 routing_key
    severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 获取 queue 的 name 
    queue_name = result.method.queue
    # 将 queue 绑定到 exchange
    channel.queue_bind(exchange='logs_topic', queue=queue_name, routing_key=severity)
    # 设置监听的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
五、实现细节部分
六、RabbitMQ 常用指令
启动: rabbitmq-server –detached

关闭: rabbitmqctl stop

若单机有多个实例,则在 rabbitmqctlh 后加 –n 指定名称
服务器状态:rabbitmqctl status
查看所有的消息队列
rabbitmqctl list_queues

清除所有队列
abbitmqctl reset

启动应用
rabbitmqctl start_app

关闭应用
rabbitmqctl stop_app

查看所有的 Exchanges
rabbitmqctl list_exchanges 

查看所有的 bindings
rabbitmqctl list_bindings
上一篇 下一篇

猜你喜欢

热点阅读