MultipleThread_RabbitMQ_MessageQ

2018-09-19  本文已影响0人  勤学奋进小郎君

环境配置

ubuntu: sudo apt-get install rabbitmq-server
python2.7 + pip install pika(调用RabbitMQ的库)

实例代码

send.py

import pika
import sys

# connect local queue server with RabbitMQ server
connect = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connect.channel()
# create a message queue, named task_queue, and if RabbitMQ server stop, the message that have been sent queue continue to execute
channel.queue_declare(queue='task_queue', durable=True)
# message that need send
message = ' '.join(sys.argv[1:]) or "Hello World!"
# send message
channel.basic_publish(
    exchange = "",
    routing_key = "task_queue",
    body = message,
    properties = pika.BasicProperties(
        # if RabbitMQ restart ,the message queue can not lose
        delivery_mode=2
    ))
print(" [x] Sent %r" % message)
connect.close()

receiver.py

import pika
import sys
import time


# connect local queue server with RabbitMQ server
connect = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connect.channel()
# detect if exist message queue named task_queue
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


# if consumer receiver message from message queue, execute this function
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(5)
    print(" [x] Done")
    # if worker(receiver) died, message queue not receiver ack, will redelivered message
    ch.basic_ack(delivery_tag = method.delivery_tag)

# avoid fair dispatch
channel.basic_qos(prefetch_count = 1)
# setup consumer
channel.basic_consume(
    callback,
    queue = "task_queue"
)
# start receiver message
channel.start_consuming()

详细信息看RabbitMQ官网地址,里面有简单的例子

上一篇下一篇

猜你喜欢

热点阅读