RabbitMQ-5.消息生产与消费
2021-12-09 本文已影响0人
那钱有着落吗
1.概念
2.实战
2.1基本逻辑
首先创建生产者类,前面我们学习了rabbitmq的架构,所以我们在发送或者消费消息的时候首先需要创建一个工厂,然后配置好rabbitmq的连接信息,然后创建一个connection,然后创建一个channel,然后再发送或者消费消息
2.2首先创建一个基本的springboot项目
创建完之后,添加一个rabbitmq的依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
创建生产者类:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//设置mq连接的自动恢复
connectionFactory.setAutomaticRecoveryEnabled(true);
//设置mq连接自动回复的时间间隔
connectionFactory.setNetworkRecoveryInterval(3000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
//4.通过channel发送数据
for(int i=0;i<5;i++){
String body = "this is a rabbit mq,no:"+i;
channel.basicPublish("","test001",null,body.getBytes());
}
channel.close();
connection.close();
}
}
创建消费者类:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//设置mq连接的自动恢复
connectionFactory.setAutomaticRecoveryEnabled(true);
//设置mq连接自动回复的时间间隔
connectionFactory.setNetworkRecoveryInterval(3000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
//4.声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName,true,false,false,null);
//5.创建一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println(msg);
}
}
}
然后我们首先启动一下消费者,因为在消费者里面我们创建了一个消息队列test001,不然先启动生产者,因为还没有test001,所以自然也就无法往这个队列发送消息了;
启动完消费者,我们这个时候再启动生产者,完了之后看消费者的日志框:
2.3注意
在上面的代码中,我们可以看到,我们在发送消息的时候可以发现我们并没有指定exchange,而是只指定了routingkey为test001。
而其实无论在消费者还是生产者中我们均没有创建exchange,仅仅创建了一个队列,这是为什么呢?
在rabbitmq中,发送消息的时候首先找exchange,如果你没有指定,那么就会进入到默认的exchange中:
而发送的routingkey由于没有指定exchange就会进入到默认的exchange,然后与所有的queue进行匹配,如果能匹配到消息就会发送过去,否则就发送不过去等于说发送失败了,就会把消息给删除掉。