MQ

05-Work Queues模式

2021-07-06  本文已影响0人  紫荆秋雪_文
Work Queues模式.png

当多个消费者同时监听一个队列时,默认消费者会轮询消费消息

一、轮询消费

1、Producer

/**
 * 消息生产者
 */
public class Producer {

    // 队列名称
    private static final String QUEUE_NAME = "hello_world";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.176.100");
        factory.setUsername("raven");
        factory.setPassword("raven");

        // 获取一个连接
        Connection connection = factory.newConnection();
        // 创建一个Channel
        Channel channel = connection.createChannel();

        /**
         * 创建一个队里
         *
         * 队列名称
         * 队里里面的消息是否持久化
         * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
         * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);


        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            //准备消息
            String msg = scanner.next();

            /**
             * 发布消息
             *
             * 发送到那个交换机
             * 路由key
             * 其他参数
             * 发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("发送消息完成:" + msg);
        }
    }

}

2、Consumer

/**
 * 接收消息
 */
public class Consumer {

    // 队列名称
    private static final String QUEUE_NAME = "hello_world";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.176.100");
        factory.setUsername("raven");
        factory.setPassword("raven");

        // 获取一个连接
        Connection connection = factory.newConnection();
        // 创建一个Channel
        Channel channel = connection.createChannel();

        System.out.println("C等待接收消息。。。。。");

        // 接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        // 取消消息回调
        CancelCallback cancelCallback = (String consumerTag) -> {
            System.out.println("消息消费被中断。。。");
        };

        /**
         * 消费消息
         *
         * 消息队列
         * 消费成功之后是否要自动应答
         * 消费成功/失败回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

3、Consumer1

/**
 * 接收消息
 */
public class Consumer1 {

    // 队列名称
    private static final String QUEUE_NAME = "hello_world";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.176.100");
        factory.setUsername("raven");
        factory.setPassword("raven");

        // 获取一个连接
        Connection connection = factory.newConnection();
        // 创建一个Channel
        Channel channel = connection.createChannel();

        System.out.println("C1等待接收消息。。。。。");

        // 接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        // 取消消息回调
        CancelCallback cancelCallback = (String consumerTag) -> {
            System.out.println("消息消费被中断。。。");
        };

        /**
         * 消费消息
         *
         * 消息队列
         * 消费成功之后是否要自动应答
         * 消费成功/失败回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

4、发送消息

11
发送消息完成:11
22
发送消息完成:22
33
发送消息完成:33
44
发送消息完成:44
55
发送消息完成:55
66
发送消息完成:66

5、消费消息

C等待接收消息。。。。。
11
33
55

C1等待接收消息。。。。。
22
44
66
上一篇下一篇

猜你喜欢

热点阅读