RabbitMQ工作模式

2020-11-18  本文已影响0人  Cook1fan
image.png
public class ProducerWorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        channel.queueDeclare(
                "work_queues", //  队列名称
                true, // 是否持久化, 当 mq 重启之后还在
                false, // 是否独占,只有有一个消费者监听这个队列,当 Connection 关闭时,是否删除队列
                false, // 是否自动删除,当没有 Consumer 时,自动删除
                null // 参数
        ); // 没有就创建队列,有就不会创建
        // 6 发送消息
        for (int i = 1; i <= 10; i++) {
            String body = i + " hello rabbitmq ~~~";
            channel.basicPublish(
                    "", // 交换机名称,简单模式下交换机会使用默认的 ""
                    "work_queues", // 路由名称
                    null, // 配置信息
                    body.getBytes() // 发送消息
            );
        }
        // 7 释放资源
        channel.close();
        connection.close();
    }
}
public class ConsumerWorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        channel.queueDeclare(
                "work_queues", //  队列名称
                true, // 是否持久化, 当 mq 重启之后还在
                false, // 是否独占,只有有一个消费者监听这个队列,当 Connection 关闭时,是否删除队列
                false, // 是否自动删除,当没有 Consumer 时,自动删除
                null // 参数
        ); // 没有就创建队列,有就不会创建
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume(
                "work_queues", // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
public class ConsumerWorkQueues2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        channel.queueDeclare(
                "work_queues", //  队列名称
                true, // 是否持久化, 当 mq 重启之后还在
                false, // 是否独占,只有有一个消费者监听这个队列,当 Connection 关闭时,是否删除队列
                false, // 是否自动删除,当没有 Consumer 时,自动删除
                null // 参数
        ); // 没有就创建队列,有就不会创建
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume(
                "work_queues", // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
image.png
image.png
public class ProducerPubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建交换机
        String exchangeName = "text_fanout";
        channel.exchangeDeclare(
                exchangeName, // 交换机名称
                BuiltinExchangeType.FANOUT, // 交换机类型(direct 定向,fanout 广播(发送消息到每一个绑定队列),topic 通配符方式,headers 参数匹配)
                true, // 是否持久化
                false, // 自动删除
                false, // 内部使用, 比如 plugin 可能用的
                null // 参数
        );
        // 6 创建队列
        String queue1Name = "text_fanout_queue1";
        String queue2Name = "text_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7 绑定队列和交换机
        channel.queueBind(
                queue1Name, // 队列名称
                exchangeName, // 交换机名称
                "" // 路由,如果交换机类型是 fanout, routingKey 设置为 ""
        );
        channel.queueBind(
                queue2Name, // 队列名称
                exchangeName, // 交换机名称
                "" // 路由,如果交换机类型是 fanout, routingKey 设置为 ""
        );
        // 8 发送消息
        String body = "hello rabbitmq ~~~";
        channel.basicPublish(
                exchangeName,
                "", // 路由名称
                null, // 配置信息
                body.getBytes() // 发送消息
        );
        // 9 释放资源
        channel.close();
        connection.close();
    }
}
public class ConsumerPubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        String queue1Name = "text_fanout_queue1";
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
                System.out.println("将消息打印到控制台");
            }
        };
        channel.basicConsume(
                queue1Name, // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
public class ConsumerPubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        String queue2Name = "text_fanout_queue2";
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
                System.out.println("将消息插入到数据库");
            }
        };
        channel.basicConsume(
                queue2Name, // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
image.png
public class ProducerRouting {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建交换机
        String exchangeName = "text_direct";
        channel.exchangeDeclare(
                exchangeName, // 交换机名称
                BuiltinExchangeType.DIRECT, // 交换机类型(direct 定向,fanout 广播(发送消息到每一个绑定队列),topic 通配符方式,headers 参数匹配)
                true, // 是否持久化
                false, // 自动删除
                false, // 内部使用, 比如 plugin 可能用的
                null // 参数
        );
        // 6 创建队列
        String queue1Name = "text_direct_queue1";
        String queue2Name = "text_direct_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7 绑定队列和交换机
        // 队列1绑定 error
        channel.queueBind(
                queue1Name, // 队列名称
                exchangeName, // 交换机名称
                "error" // 路由,如果交换机类型是 fanout, routingKey 设置为 ""
        );
        // 队列二绑定 info error warning
        channel.queueBind(queue2Name, exchangeName, "info");
        channel.queueBind(queue2Name, exchangeName, "error");
        channel.queueBind(queue2Name, exchangeName, "warning");
        // 8 发送消息
        String body = "hello rabbitmq ~~~";
        channel.basicPublish(exchangeName, "error", null, body.getBytes());
        // 9 释放资源
        channel.close();
        connection.close();
    }
}
public class ConsumerRouting1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        String queue1Name = "text_direct_queue1";
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
                System.out.println("将消息插入数据库");
            }
        };
        channel.basicConsume(
                queue1Name, // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
public class ConsumerRouting2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        String queue2Name = "text_direct_queue2";
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
                System.out.println("将消息打印到控制台");
            }
        };
        channel.basicConsume(
                queue2Name, // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
image.png
image.png
public class ProducerTopics {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建交换机
        String exchangeName = "text_topic";
        channel.exchangeDeclare(
                exchangeName, // 交换机名称
                BuiltinExchangeType.TOPIC, // 交换机类型(direct 定向,fanout 广播(发送消息到每一个绑定队列),topic 通配符方式,headers 参数匹配)
                true, // 是否持久化
                false, // 自动删除
                false, // 内部使用, 比如 plugin 可能用的
                null // 参数
        );
        // 6 创建队列
        String queue1Name = "text_topic_queue1";
        String queue2Name = "text_topic_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7 绑定队列和交换机
        // routing Key 系统的名称.日志的级别
        // error 存数据库, order 存数据库
        channel.queueBind(queue1Name, exchangeName, "#.error");
        channel.queueBind(queue1Name, exchangeName, "order.*");
        channel.queueBind(queue2Name, exchangeName, "*.*");
        // 8 发送消息
        String body = "hello rabbitmq ~~~";
        channel.basicPublish(exchangeName, "order.info", null, body.getBytes());
        // 9 释放资源
        channel.close();
        connection.close();
    }
}
public class ConsumerTopic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        String queue1Name = "text_topic_queue1";
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
                System.out.println("将消息插入数据库");
            }
        };
        channel.basicConsume(
                queue1Name, // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
public class ConsumerTopic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("heima");
        factory.setPassword("heima");
        // 3 创建连接 Connection
        Connection connection = factory.newConnection();
        // 4 创建 Channel
        Channel channel = connection.createChannel();
        // 5 创建队列 Queue
        String queue2Name = "text_topic_queue2";
        // 6 接受消息
        Consumer consumer = new DefaultConsumer(channel) {
            // 收到消息后,会自动执行该方法
            @Override
            public void handleDelivery(
                    String consumerTag, // 标识
                    Envelope envelope, // 获取一些信息,交换机,路由 key
                    AMQP.BasicProperties properties, // 配置信息
                    byte[] body // 数据
            ) {
                System.out.println("body: " + new String(body));
                System.out.println("将消息输出控制台");
            }
        };
        channel.basicConsume(
                queue2Name, // 队列名称
                true, // 是否自动确认
                consumer // 回调对象
        );
    }
}
image.png
image.png
上一篇下一篇

猜你喜欢

热点阅读