工作专题Java学习笔记SpringBoot极简教程 · Spring Boot

RabbitMQ:消息发送确认 与 消息接收确认(ACK)

2018-02-05  本文已影响4735人  聪明的奇瑞

默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除

如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除

消息发送确认

发送的消息怎么样才算失败或成功?如何确认?

ConfirmCallback

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标识:"+correlationData);
        System.out.println("确认结果:"+ack);
        System.out.println("失败原因:"+cause);
    }
spring:
  rabbitmq:
    publisher-confirms: true 

ReturnCallback

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息主体 message : "+message);
        System.out.println("消息主体 message : "+replyCode);
        System.out.println("描述:"+replyText);
        System.out.println("消息使用的交换器 exchange : "+exchange);
        System.out.println("消息使用的路由键 routing : "+routingKey);
    }
}
spring:
  rabbitmq:
    publisher-returns: true 

消息接收确认

消息消费者如何通知 Rabbit 消息消费成功?

确认消息(局部方法处理消息)

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
    return factory;
}
@RabbitHandler
public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    System.out.println(message);
    try {
        channel.basicAck(tag,false);            // 确认消息
    } catch (IOException e) {
        e.printStackTrace();
    }
}

手动否认、拒绝消息

hducA.png
@RabbitHandler
public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
    System.out.println(message);
    if (map.get("error")!= null){
        System.out.println("错误的消息");
        try {
            channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息
            return;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    try {
        channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息
    } catch (IOException e) {
        e.printStackTrace();
    }
}
hello
错误的消息
hello
错误的消息
hello
错误的消息
hello
错误的消息
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

确认消息(全局处理消息)

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");                 // 监听的队列
    container.setAcknowledgeMode(AcknowledgeMode.NONE);     // NONE 代表自动确认
    container.setMessageListener((MessageListener) message -> {         //消息监听处理
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        //相当于自己的一些消费逻辑抛错误
        throw new NullPointerException("consumer fail");
    });
    return container;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");              // 监听的队列
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);        // 手动确认
    container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {      //消息处理
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        if(message.getMessageProperties().getHeaders().get("error") == null){
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消息已经确认");
        }else {
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消息拒绝");
        }

    });
    return container;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");              // 监听的队列
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);     // 根据情况确认消息
    container.setMessageListener((MessageListener) (message) -> {
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        //抛出NullPointerException异常则重新入队列
        //throw new NullPointerException("消息消费失败");
        //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
        //throw new AmqpRejectAndDontRequeueException("消息消费失败");
        //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
        throw new ImmediateAcknowledgeAmqpException("消息消费失败");
    });
    return container;
}

消息可靠总结

上一篇下一篇

猜你喜欢

热点阅读