Java 程序员Java

MQ的路由模式(direct)

2021-08-13  本文已影响0人  程序花生

前言

模拟一个场景

一、生产者

    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(ChangeNameConstant.DIRECT_MODEL, BuiltinExchangeType.DIRECT);
        //创建多个 bindingKey
        Map<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("info","普通 info 信息");
        bindingKeyMap.put("warning","警告 warning 信息");
        bindingKeyMap.put("error","错误 error 信息");
        //debug 没有消费这接收这个消息 所有就丢失了
        bindingKeyMap.put("debug","调试 debug 信息");
        for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(ChangeNameConstant.DIRECT_MODEL,bindingKey, null,
                    message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }

可以看到:direct_pattern交换机上设置了三个路由

二、消费者

消费者A

/**
 * 这是一个测试的消费者
 *@author DingYongJun
 *@date 2021/8/1
 */
public class DyConsumerTest_direct01 {

    public static void main(String[] args) throws Exception{
        //使用工具类来创建通道
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = "disk";
        channel.queueDeclare(queueName, false, false, false, null);
        //这个专门处理error日志,将其保存至本地
        channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "error");
        System.out.println("A等待接收消息,把接收到的消息打印在屏幕.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(message+"已经保存到本地啦");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            System.out.println("消息中断了~");
        });
    }
}

消费者B

/**
 * 这是一个测试的消费者
 *@author DingYongJun
 *@date 2021/8/1
 */
public class DyConsumerTest_direct02 {

    public static void main(String[] args) throws Exception{
        //使用工具类来创建通道
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = "console";
        channel.queueDeclare(queueName, false, false, false, null);
        //这个专门处理error日志,将其保存至本地
        channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "warning");
        channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "info");
        System.out.println("B等待接收消息,把接收到的消息打印在屏幕.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(message+"已经消费完并丢弃了");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            System.out.println("消息中断了~");
        });
    }
}

消费者AB都已准备好。

执行结果

完美符合我们模拟的场景需求!vnice!

三、总结

多重绑定

是不是比发布订阅模式更加智能了呢?

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同。

在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多。

也就是这玩意是复杂模式可以向下兼容简单模式!

作者:大鱼丶
链接:https://juejin.cn/post/6995709405085827108
来源:掘金

上一篇下一篇

猜你喜欢

热点阅读