RabbitMQ轮询分发和公平分发

2020-11-21  本文已影响0人  裂开的汤圆

轮询分发

先看代码,生产者生产十条消息。开启两个消费者,组成工作队列,消费者1消费完一条消息后将线程挂起1秒,消费者2消费完一条消息后将线程挂起两秒。

生产者:

public class Send {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("sending...");
        for(int i=0; i < 10; i++){
            String msg = "第" + i + "条消息";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者1:

public class Rec1 {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 接收到消息后的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
            // 线程挂起1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 监听队列,每当队列中接收到新消息后会触发回调函数
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

消费者2:

public class Rec2 {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 接收到消息后的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
            // 线程挂起两秒
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 监听队列,每当队列中接收到新消息后会触发回调函数
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    }
}

轮询分发运行结果

消费者1消费情况 消费者2消费情况

思考

根据上述截图我们可以发现,轮询分发不会根据消费者的消费情况进行分发,永远都是一人一条的分发。但这样带来的问题在于,浪费了强消费者的性能(可以看到上述截图中,消费者1在12:03:51秒就已经消费完毕了,后面处于空闲状态,而消费者2在12:05:55秒才消费完毕,消费者1闲置了51s-55s大约四秒钟的时间)。

发生轮询分发的原因在于RabbitMQ在消息进入队列时才调度消息。它不会查看使用者的未确认消息数。它只是盲目的将第N条消息发送给第N个使用者。

解决上述问题可以采用以下语句。这告诉RabbitMQ一次只给消费者一条消息,并且在消费者处理并确认该消息后,再将新消息发送给消费者。

channel.basicQos(1);

公平分发消费者代码

public class Rec1{
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 一次仅接受一条未经确认的消息,
        channel.basicQos(1);

        // 接收到消息后的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            // 返回确认消息给rabbitmq
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 关闭自动消息确认
        // 消费者发送回确认,以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。
        // 如果消费者在不发送确认的情况下挂掉,RabbitMQ将了解消息未得到充分处理,并将重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。
        boolean autoAck = false;
        // 监听队列,每当队列中接收到新消息后会触发回调函数
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
}

采用公平分发后的执行结果

消费者1消费截图 消费者2消费截图

可以看到现在就不会出现一个消费者工作,一个消费者闲置的问题

上一篇 下一篇

猜你喜欢

热点阅读