rabbitmq介绍

2019-09-28  本文已影响0人  源来是你啊

RabbitMq

amqp协议

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

1.简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

2.使用

2.1 核心概念

Message :消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、 priority(相对于其他消息的优先权)、 delivery-mode(指出 该消息可能需要持久性存储)等。

Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序

Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型: direct(默认), fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。

Connection:网络连接,比如一个TCP连接。

Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接, AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

2019-06-14_145924.png

2.2 简单模式

dubbo-service-governance.jpg

一个生产者,一个消费者

 * 获取连接
 * @return Connection
 * @throws Exception
 */
 public static Connection getConnection() throws Exception {
 //定义连接工厂
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("192.168.1.235");
 factory.setPort(5672);
 //设置vhost
 factory.setVirtualHost("/tzb");
 factory.setUsername("test");
 factory.setPassword("123456");
 //通过工厂获取连接
 Connection connection = factory.newConnection();
 return connection;
 }
​
 //创建队列,发送消息
 public static void main(String[] args) throws Exception {
 //获取连接
 Connection connection = ConnectionUtil.getConnection();
 //创建通道
 Channel channel = connection.createChannel();
 //声明创建队列
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 //消息内容
 String message = "Hello World!";
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println("发送消息:"+message);
 //关闭连接和通道
 channel.close();
 connection.close();
 }
​ //消费者消费消息
 public static void main(String[] args) throws Exception {
 //获取连接和通道
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 //声明通道
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 //定义消费者
 QueueingConsumer consumer = new QueueingConsumer(channel);
 //监听队列
 channel.basicConsume(QUEUE_NAME,true,consumer);
​
 while(true){
 //这个方法会阻塞住,直到获取到消息
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 String message = new String(delivery.getBody());
 System.out.println("接收到消息:"+message);
 }
 }

2.3 work模式

2019-06-14_175154.png

一个生产者,多个消费者,每个消费者获取到的消息唯一

public static void main(String[] args) throws Exception {
 //获取连接和通道
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 //声明队列
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 String message = "";
 for(int i = 0; i<100; i++){
 message = "" + i;
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println("发送消息:"+message);
 Thread.sleep(i);
 }
​
 channel.close();
 connection.close();
 }
​
 //消费者1
 public static void main(String[] args) throws Exception {
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
​
 //同一时刻服务器只发送一条消息给消费端
 channel.basicQos(1);
​
 QueueingConsumer consumer = new QueueingConsumer(channel);
​
 channel.basicConsume(QUEUE_NAME,false,consumer);
​
 while(true){
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 String message = new String(delivery.getBody());
 System.out.println("recive1:"+message);
 Thread.sleep(100);
 //消息消费完给服务器返回确认状态,表示该消息已被消费
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 }
 }
​
 //生产者2
 public static void main(String[] args) throws Exception {
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
​
 channel.basicQos(1);
​
 QueueingConsumer consumer = new QueueingConsumer(channel);
​
 channel.basicConsume(QUEUE_NAME,true,consumer);
​
 while(true){
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 String message = new String(delivery.getBody());
 System.out.println("recive1:"+message);
 Thread.sleep(10);
 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 }
 }

消息消费的两种模式

1、 自动模式

消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。

2、 手动模式

消费者从消息队列获取消息后,服务端并没有标记为成功消费 ​ 消费者成功消费后需要将状态返回到服务端

2.4 订阅模式

一个生产者发送的消息会被多个消费者获取。

生产者:可以将消息发送到队列或者是交换机。

消费者:只能从队列中获取消息。

如果消息发送到没有队列绑定的交换机上,那么消息将丢失。

2019-06-14_175701.png

2.5 路由模式

1、 发送消息到交换机并且要指定路由key

2、 消费者将队列绑定到交换机时需要指定路由key

是一种完全匹配,只有匹配到的消费者才能消费消息

2019-06-14_175852.png
上一篇下一篇

猜你喜欢

热点阅读