rabbitmq(二)订阅模式\路由模式\topic
2019-05-21 本文已影响0人
guideEmotion
一 消息应答和消息持久化
boolan autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
boolan autoAck = true;
时时自动确认模式:一旦rabbitmq将消息分给消费者,就会从内存中删除
缺陷这种情况下,如果杀手正在执行的消费者,就会丢失正在处理的消息
boolan autoAck = true;
自动确认模式:一旦rabbitmq将消息分给消费者,就会从内存中删除
缺陷这种情况下,如果杀手正在执行的消费者,就会丢失正在处理的消息
boolan autoAck = false;
手动确认模式:一旦有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答。高速rabbitmq这个消息我已经处理完成,你可以删了。然后rabbitmq就会删除内存中的消息
消息应答默认展开的即autoAck = false
ack:message acknowledgment
消息持久化
boolean durable = false;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null)
已经定义的队列不能改变持久化状态
二 订阅模式
前面都是一个消息只能被一个消费者消费,该模式可以实现一个消息发送给多个消费者
模型
特点
- 一个生产者,多个消费者
- 每个消费者都有自己的队列
- 生产者没有直接把消息发送到队列,而是发送到了交换机 转发器 exchange
- 每个队列都要绑定到交换机上
- 生产者发送的消息 经过交换机 到达队列 就能实现 一个消息被多个消费者消费
生产者
private static final String EXCHANGE_NAME = "test_first_exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectiionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String msg = "hello excahnge";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println(" 发送完毕 "+msg);
channel.close();
connection.close();
}
效果
注意:交换机没有存储的能力,在rabbitmq中只有队列有存储能力。因为此时还没有队列绑定,所以数据丢失了
消费者
private static final String QUEUE_NAME = "test_first_exchange_queue1";
private static final String EXCHANGE_NAME = "test_first_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectiionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// 接收到的消息
String message = new String(body);
System.out.println("Receive 接收到的消息 " + message);
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
channel.close();
connection.close();
}
image.png
三 Exchange(交换机 转发器)
一方面是接收生产者的消息,另一方面是向队列推送消息
匿名和非匿名
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//匿名往队列发
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());//非匿名,往exchange发,且第二个字符串表示route key
fanout(不处理路由键)
direct(处理路由键)
路由模式
image.png生产者
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectiionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello excahnge direct";
String routingkey = "error";
channel.basicPublish(EXCHANGE_NAME,routingkey,null,msg.getBytes());
System.out.println(" 发送完毕 "+msg);
channel.close();
connection.close();
}
消费者
private static final String QUEUE_NAME = "test_exchange_direct_queue2";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectiionUtil.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warn");
channel.basicQos(1);//保证一次只分发一个
DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// 接收到的消息
String message = new String(body);
System.out.println("Receive2 接收到的消息 " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Receive2 end" );
//手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
topic模式
将路由键和某模式匹配
匹配一个或或多个
- 匹配一个
模型
生产者
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectiionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String msg = "test test";
String routingkey = "white.big.s";
channel.basicPublish(EXCHANGE_NAME,routingkey,null,msg.getBytes());
System.out.println(" 发送完毕 "+msg);
channel.close();
connection.close();
}
消费者
private static final String QUEUE_NAME = "test_exchange_topic_queue2";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectiionUtil.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"white.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// 接收到的消息
String message = new String(body);
System.out.println("Receive-2 接收到的消息 " + message);
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}