RabbitMQ发布/订阅模式(Publish/Subscrib
2020-11-20 本文已影响0人
裂开的汤圆
模型
模型解读:
1.一个生产者,多个消费者
2.每个消费者都有属于自己的队列
3.生产者没有直接将消息发送到队列中,而是发送到转发器(exchange),再由转发器分发到队列中去
4.订阅模式可以实现,一个消息被多个消费者消费
生产者代码
public class Send {
private static final String EXCHANGE_NAME = "exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
// fanout:分发类型,表示绑定到这个交换机的队列都收到这个消息
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 发送消息
String msg = "hello publish model";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者代码
public class Rec1 {
private static final String QUEUE_NAME = "publish_queue_1";
private static final String EXCHANGE_NAME = "exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列绑定到交换机上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 接收到消息后的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
};
// 监听队列,每当队列中接收到新消息后会触发回调函数
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
可以在RabbitMQ管理后台中查看该交换机
总结
对比简单消息队列以及发布/订阅模式消息队列,我们可以发现,在发布/订阅模式中,生产者通道需要绑定到交换机上,而不能直接绑定到队列上。消费者的队列需要绑定到交换机上,统一由交换机复制数据的派发,可以参考文章一开始的模型图加深印象