rabbitMQ - 5 可靠性传输

2019-04-28  本文已影响0人  cf6bfeab5260

谈到MQ,一定会涉及一个问题,如何进行可靠性传输,换句话说,就是当发生不可预见的情况时(broker重启,sender重启,receiver重启,网线被挖了等),保证消息的送达。我们先再回顾一波rabbitmq消息传输的过程:


image.png
  1. 消息从producer传输到exchange。
  2. 消息从exchange到queue。
  3. cusumer监听queue接收消息。
    搞清楚把大象装进冰箱需要这3步以后,我们一步一步来谈论:

1 保证producer的消息发送到了exchange

public class HelloSender implements RabbitTemplate.ConfirmCallback{
 @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
       if(ack){
           System.out.println("消息已经送达 exchange,correlationData="+correlationData);
       }else {
           System.out.println("消息未送达exchange,原因是:"+cause+", correlationData="+correlationData);
       }
    }

3、send的时候需要setConfirmCallback:

public void send() {
        User u = new User();
        u.setName("周杰伦");
        u.setAge(23);
        rabbitTemplate.setConfirmCallback(this); 
        this.rabbitTemplate.convertAndSend("hello", u);
        System.out.println("Sender : " + u);
    }

注意:这两个方法是互斥的,两个同时使用,rabbitmq会报错。

2 保证消息发送到了queue

1、添加配置:

spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true

当这spring.rabbitmq.template.mandatory配置为false时,exchange找不到匹配的queue的时候,消息就丢弃掉。当这两个属性为true时,exchange找不到匹配的队列,会给producer发送消息。
2、implement RabbitTemplate.ReturnCallback,并实现returnedMessage方法:

public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
 @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息没有送达queue:message="+message+",replyCode="+replyCode+
                " ,replyText="+replyText+",exchange="+exchange+",routingKey="+routingKey);
    }

3、发送的时候需要setReturnCallback

public void send() {
        User u = new User();
        u.setName("周杰伦");
        u.setAge(23);
        
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend("hello", u);
        System.out.println("Sender : " + u);
    }

3 持久化

当消息达到了exchange或者queue,但是还未处理 broker就重启了怎么办呢? 答案是持久化:


image.png
image.png

exchange和queue都是把通过Durability控制是否持久化,Durable表示持久化,Transient表示暂存。但是!这里的持久化是指exchange或者queue的自身元数据的持久化,并不是消息的持久化,我们还需要将消息持久化,通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。

public void sendPersistent() {
        User u = new User();
        u.setName("周杰伦");
        u.setAge(23);
        byte[] userByte=null;
        try {
            userByte=u.toString().getBytes("utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        MessageProperties msgProperties=MessagePropertiesBuilder.newInstance().setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        Message msg= MessageBuilder.withBody(userByte).andProperties(msgProperties).build();

        this.rabbitTemplate.convertAndSend("persistentQueue", msg);
        System.out.println("Sender : " + u);
    }

4 保证消息被成功消费

springboot下 ack的参数:

#manual - 手动,需要自己调用代码去ack ,
#auto - 自动(默认),在cosumer方法执行完成以后自动进行ack ,
#none - 没有ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

所以我们要么配置auto(默认)要么配置manual,可以保证我们消费者消费完成再删queue里的消息。配置成none则无法保证。
还有一点需要注意,rabbitmq等待ack没有超时机制,如果链接不断,那么它会一直等待ack或者nack的返回。
另外,如果配置了ack,由于方法跑了一半挂掉启动的时候消息会再次被收到,所以cosumer的方法逻辑必须做到幂等
手动ack代码:

@RabbitListener(queues = "persistentQueue")
    @RabbitHandler
    public void process(Channel channel, Message message) {
        try {
            System.out.println("Receiver1  : " + new String(message.getBody(),"utf-8"));
            //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            //ack返回false,并重新回到队列,api里面解释得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒绝消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读