RabbitMQ

RabbitMQ高级特性-1.消息投递的几个问题

2021-12-13  本文已影响0人  那钱有着落吗

本帖子学习一下rabbitMQ在消息的投递中的几个问题:

1.消息如何保障100%的投递成功

1.1什么是生产端的可靠性投递

1.1.1消息落库,对消息状态进行打标

下面我以一个订单为例对上图进行一个流程性的讲解:

1.1.2保障MQ我们思考如果第一种可靠性投递,在高并发的场景下是否适合?

这个时候我们就换一种思路:消息采用延迟投递,做二次确认,回调检查

这里的步骤我详细的解释一下:

上面就是两种方式来保障消息的可靠性,我们可以对比一下第二种相对于第一种的消息发送架构,相对来说减少了数据库持久化或者说操作的次数:1.第一种方式每次消息的发送都首先要持久化两次数据库,持久化订单信息然后持久化消息的记录,如果消息失败第二次依然还是要持久化数据库 2.第二种方式的同一条消息仅仅持久化一次,订单也是,而且没有修改状态的数据库操作,相对于第一种减少了一些数据库的操作,而这些操作如果随着并发数的提升,对于数据库性能会相对的提升很多,虽然一两次的消息发送不明显,可是当量级的请求上去,这种微小的资源节约将会对服务器来说,提升很多性能。

2. 幂等性

2.1概念

上图中就是用了乐观锁的机制,没有加version的控制的话,如果并发情况下两条SQL在执行这个语句的时候,如果count是商品的数量,那么如果一个商品仅剩1件的时候,两条SQL如果同时执行,那么数量很可能就会成为负数,这里就加上version的控制,每次执行完,version加1,这样一条SQL执行完,再次查询version=1的数据就没有了,这就用到了乐观锁机制。

而幂等性就好比我们举的例子,一件事情我们执行一次,两次,甚至几百次,最后的结果总是相同的,这就是幂等性。

2.2 消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息。

2.3 业界主流的幂等性操作


在利用redis原子特性实现的时候,我们要注意这两个问题,就是因为redis和数据库是两个独立的存在,各自有各自的事务,所以两个一起使用的话如何实现原子性这是跟问题;

第二个问题就是如果不落库,存储在redis上的话,那么定时同步的策略就需要保障他的可靠性,同步失败怎么办,有没有什么措施,这些都需要考虑的问题。

3.Confirm确认消息


3.1代码实现

producer


    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        channel.confirmSelect();

        String msg = "this is a rabbit mq,I need a feedback";
        channel.basicPublish("test_confirm_exchange","confirm.save",null,msg.getBytes());


        channel.addConfirmListener(new ConfirmListener() {
            //消息确认成功
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("---消息确认成功!-----");
            }
            //消息确认失败
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("---消息确认失败!-----");
            }
        });

    }

consumer


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //设置mq连接的自动恢复
        connectionFactory.setAutomaticRecoveryEnabled(true);
        //设置mq连接自动回复的时间间隔
        connectionFactory.setNetworkRecoveryInterval(3000);

        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        //4.声明(创建)一个队列
        String queueName = "test_confirm_queue";
        String exchangeName = "test_confirm_exchange";
        String exchangeType = "topic";
        String routingKey = "confirm.#";

        channel.exchangeDeclare(exchangeName,exchangeType,true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);

        //5.创建一个消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,queueingConsumer);

        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端接受的消息:"+msg);
        }
    }

4.Return消息机制


return消息其实就是去获取路由不成功的消息,然后做后续的处理的机制而已:


Return消息机制流程

4.1代码实现

consumer

 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        //4.声明(创建)一个队列
        String queueName = "return_queue";
        String exchangeName = "return_exchange";
        String exchangeType = "topic";
        String routingKey = "return.#";
        String errRoutingKey = "abc.#";

        channel.exchangeDeclare(exchangeName,exchangeType,true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,errRoutingKey);

        //5.创建一个消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,queueingConsumer);

        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端接受的消息:"+msg);
        }
    }

producer

public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        channel.confirmSelect();

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----handle return ----");
                System.out.println("replayCode: "+replyCode);
                System.out.println("replyText: "+replyText);
                System.out.println("exchange: "+exchange);
                System.out.println("routingKey: "+routingKey);
                System.out.println("properties: "+properties);
                System.out.println("body: "+new String(body));
            }
        });

        String msg = "this is a rabbit mq,I need a feedback";
        channel.basicPublish("return_exchange","return.save",true,null,msg.getBytes());

    }

在上面的consumer我们把exchange绑定一个abc.#的路由,然后这样生产端发送消息,肯定就无法路由到序列了,这样就会出现路由失败的情况,那么在生产端的return方法,我们就可以看到路由失败的信息:

上一篇 下一篇

猜你喜欢

热点阅读