JavaRabbitMQ

RabbitMQ-5.消息生产与消费

2021-12-09  本文已影响0人  那钱有着落吗

1.概念

2.实战

2.1基本逻辑

首先创建生产者类,前面我们学习了rabbitmq的架构,所以我们在发送或者消费消息的时候首先需要创建一个工厂,然后配置好rabbitmq的连接信息,然后创建一个connection,然后创建一个channel,然后再发送或者消费消息

2.2首先创建一个基本的springboot项目

创建完之后,添加一个rabbitmq的依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>

创建生产者类:


public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //设置mq连接的自动恢复
        connectionFactory.setAutomaticRecoveryEnabled(true);
        //设置mq连接自动回复的时间间隔        
        connectionFactory.setNetworkRecoveryInterval(3000);


        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        //4.通过channel发送数据
        for(int i=0;i<5;i++){
            String body = "this is a rabbit mq,no:"+i;
            channel.basicPublish("","test001",null,body.getBytes());
        }

        channel.close();
        connection.close();
    }

}

创建消费者类:


public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //设置mq连接的自动恢复
        connectionFactory.setAutomaticRecoveryEnabled(true);
        //设置mq连接自动回复的时间间隔        
        connectionFactory.setNetworkRecoveryInterval(3000);

        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        //4.声明(创建)一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName,true,false,false,null);

        //5.创建一个消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,queueingConsumer);

        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println(msg);
        }
    }

}

然后我们首先启动一下消费者,因为在消费者里面我们创建了一个消息队列test001,不然先启动生产者,因为还没有test001,所以自然也就无法往这个队列发送消息了;

启动完消费者,我们这个时候再启动生产者,完了之后看消费者的日志框:

2.3注意

在上面的代码中,我们可以看到,我们在发送消息的时候可以发现我们并没有指定exchange,而是只指定了routingkey为test001。

而其实无论在消费者还是生产者中我们均没有创建exchange,仅仅创建了一个队列,这是为什么呢?

在rabbitmq中,发送消息的时候首先找exchange,如果你没有指定,那么就会进入到默认的exchange中:


而发送的routingkey由于没有指定exchange就会进入到默认的exchange,然后与所有的queue进行匹配,如果能匹配到消息就会发送过去,否则就发送不过去等于说发送失败了,就会把消息给删除掉。

上一篇 下一篇

猜你喜欢

热点阅读