RabbitMQ-工作队列

2020-07-20  本文已影响0人  jiahzhon
image.png
public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
        // 获取链接
        Connection connections = ConnectionUtils.getConnections();
        // 获取channel
        Channel channel = connections.createChannel();
        //
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            String msg = "workQueue" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("发送的消息" + msg);
            Thread.sleep(i + 20);
        }
        channel.close();
        connections.close();
    }
}
public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws Exception {
        // 获取链接
        Connection connections = ConnectionUtils.getConnections();
        // 获取通道
        Channel channel = connections.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 一旦有消息到达,就触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("消费者1" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1]  done");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws Exception {
        // 获取链接
        Connection connections = ConnectionUtils.getConnections();
        // 获取通道
        Channel channel = connections.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义一个消费者

        Consumer consumer = new DefaultConsumer(channel) {
            // 一旦有消息到达,就触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("消费者2" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2]  done");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

现象:消费者1和消费者2处理的数据消息是一样的,
消费者1都是偶数
消费者2都是奇数
这种方式叫 轮询分发(round-robin)结果是 不管谁忙或者谁闲,任务消息都是一边一个轮询发

公平分发

        /**
         * 每个消费者发送确认消息之前,消息队列不发送下一个消息,一次只处理一个消息
         */
        int prefecthCount=1;
        channel.basicQos(prefecthCount);

1595235320(1).jpg

现象:消费者1处理得比消费者2多(能者多劳,1模拟暂停时间是1000,2模拟暂停时间是2000)

上一篇 下一篇

猜你喜欢

热点阅读