mac 安装rabbitmq

2019-05-09  本文已影响0人  dongshangtong

我感觉在mac 安装rabbitmq 挺方便的。
在终端:

brew install rabbitmq

然后就等下载完以后就开始编译安装

最后运行:

brew services start rabbitmq

它的web 管理界面是:[-->>管理]http://localhost:15672
默认账号和密码是:guest

下面是ubuntu 安装:
由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang

sudo apt-get install erlang-nox

更新源:

sudo apt-get update

安装Rabbitmq

sudo apt-get install rabbitmq-server

启动、停止、重启、状态rabbitMq命令

sudo rabbitmq-server start
sudo rabbitmq-server stop
sudo rabbitmq-server restart
sudo rabbitmqctl status

添加admin,并赋予administrator权限

# 添加admin用户,密码设置为admin。
sudo rabbitmqctl add_user  admin  admin  
# 赋予权限
sudo rabbitmqctl set_user_tags admin administrator

# 赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'

下面我用python 来链接rabbitMQ
先安装:

pip3 install pika 

(1) 生产和消费
生产端:

import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='rabbit-test')

channel.basic_publish(exchange='',
                      routing_key='rabbit-test',   # 消息队列名称
                      body='fafafa999')
connection.close()

消费端:

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rabbit-test")


def callback(ch, method, properties, body):

    print("消费者接受到了新任务: %r" % body)


channel.basic_consume("rabbit-test", callback, True) # True表示不告诉服务端消息去取走了, 默认False 告诉服务端消息去取走了

channel.start_consuming()

(2) 消息回复
生产端不修改,消费端改一点代码
生产端:

import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='rabbit-test')

channel.basic_publish(exchange='',
                      routing_key='rabbit-test',   # 消息队列名称
                      body='fafafa999')
connection.close()

消费端:

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rabbit-test")


def callback(ch, method, properties, body):
     #  增加一行代码
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("消费者接受到了新任务: %r" % body)

#  True 改为False
channel.basic_consume("rabbit-test", callback, False) # True表示不告诉服务端消息去取走了, 默认False 告诉服务端消息去取走了

channel.start_consuming()

(3) 持久化操作
当ribbatMQ 服务器 垮了,我们已经做了持久化操作,重启服务器依然消息还在。

我们只需在服务端修改一下代码

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
#  声明一个队列(创建一个队列)-支持持久化  -- durable=True增加代码
channel.queue_declare(queue='rabbit-test1', durable=True)

channel.basic_publish(exchange='',
                      routing_key='rabbit-test1',   # 消息队列名称
                      body='fafafa999ff',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )  #  -- 增加代码
                      )
connection.close()

(4) 闲置消费
rabbitmq 默认是轮询处理消息。
我们要改动消费端 谁闲置谁消费, 提高效力。
增加谁闲置谁消费:
channel.basic_qos(prefetch_count=1)

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rabbit-test1")


def callback(ch, method, properties, body):

    print("消费者接受到了新任务: %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


 # 增加谁闲置谁消费
channel.basic_qos(prefetch_count=1)
channel.basic_consume("rabbit-test1", callback, False)  # True表示不告诉服务端消息去取走了, 默认False 告诉服务端消息去取走了

channel.start_consuming()

(5)消息发布
生产端:

import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='mm1', exchange_type='fanout')

channel.basic_publish(exchange='mm1',
                      routing_key='',
                      body='hello world!')

connection.close()

消费端:

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm1', exchange_type='fanout')

# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test5", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm1', queue=queue_name)


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue_name, callback, True)
channel.start_consuming()

(6)关键字发布
routing_key='dong',
生产端:

import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='mm7', exchange_type='direct')

channel.basic_publish(exchange='mm7',
                      routing_key='dong',
                      body='hello world! 666655')

connection.close()

消费端1:

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm7', exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test7", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm7', queue=queue_name, routing_key='keep')
channel.queue_bind(exchange='mm7', queue=queue_name, routing_key='dong')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue_name, callback, True)
channel.start_consuming()

消费端2:

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm7', exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test8", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm7', queue=queue_name, routing_key='keep')




def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue_name, callback, True)
channel.start_consuming()

(7).关键字模糊匹配发布
主要修改
exchange_type='topic'

生产端:

import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='mm8', exchange_type='topic')

channel.basic_publish(exchange='mm8',
                      routing_key='dong.keep.cc',
                      body='hellorrrr world! ong.keep.cc')

connection.close()

消费端1

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm8', exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test8", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm8', queue=queue_name, routing_key='dong.#')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue_name, callback, True)
channel.start_consuming()

消费端2

import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm8', exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test9", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm8', queue=queue_name, routing_key='dong.#')




def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue_name, callback, True)
channel.start_consuming()
上一篇下一篇

猜你喜欢

热点阅读