消息确认与回调、消息序列化与Headers

2020-11-10  本文已影响0人  Doooook

一、消息确认与回调

默认情况下,RabbitMQ发送消息以及接收消息是自动确认的,意思也就是说,消息发送方发送消息的时候,认为消息已经成功发送到了RabbitMQ服务器,而当消息发送给消费者后,RabbitMQ服务器就立即自动确认,然后将消息从队列中删除了。而这样的自动机制会造成消息的丢失,我们常常听到“丢消息”的字眼。

为了解决消息的丢失,RabbitMQ便产生了手动确认的机制:

1.1 修改配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: mitter
    password: mitter
    virtual-host: mitter_vhost # 注意:这里前面不能带/,默认的“/”理解成字符串就行,和Linux的目录斜杠还不是一回事
    publisher-confirms: true # 消息发送到交换机确认机制,是否确认回调
    publisher-returns: true # 消息发送到交换机确认机制,是否返回回调
    listener:
      simple:
        acknowledge-mode: manual # 采用手动应答
        concurrency: 1 # 指定最小的消费者数量
        max-concurrency: 100 # 指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

1.2 配置交换机、队列

@Configuration
public class MeassageAckConfig {

    public static final String MESSAGE_ACK_EXCHANGE = "direct-message-ack-exchange";
    public static final String MESSAGE_ACK_QUEUE = "message-ack-queue";
    public static final String MESSAGE_ACK_ROUTE_KEY = "message.ack.key";

    @Bean
    public Queue messageAckQueue() {
        return QueueBuilder.durable(MESSAGE_ACK_QUEUE).build();
    }

    @Bean
    public DirectExchange directMessageAckExchange() {
        return (DirectExchange) ExchangeBuilder.directExchange(MESSAGE_ACK_EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding directMessageBinding(DirectExchange directMessageAckExchange, Queue messageAckQueue) {
        return BindingBuilder.bind(messageAckQueue).to(directMessageAckExchange).with(MESSAGE_ACK_ROUTE_KEY);
    }

}

1.3 消息生产者

/**
 * 生产者
 */
@Component
public class MessageAckProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 给hello队列发送消息
     */
    public void send() {
        for (int i = 0; i < 100; i++) {
            String msg = "hello,序号: " + i;
            System.out.println("Producer," + msg);
            rabbitTemplate.convertAndSend(MeassageAckConfig.MESSAGE_ACK_EXCHANGE, MeassageAckConfig.MESSAGE_ACK_ROUTE_KEY, msg);
        }
    }

}

1.4 消息消费者

/**
 * 消费者
 */
@Component
public class MessageAckConsumer {

    private static final Logger logger = LoggerFactory.getLogger(MessageAckConsumer.class);

    @RabbitListener(queues = MeassageAckConfig.MESSAGE_ACK_QUEUE)
    public void process(Message message, Channel channel) {
        try {
            // 采用手动应答模式,手动确认应答更为安全稳定
            logger.info("receive: " + new String(message.getBody()));
            // 制造异常,向队列回放消息
            if ("\"hello,序号: 50\"".equals(new String(message.getBody()))) {
                int a = 1/0;
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("process error by message: {} and channel: {}", message.getMessageProperties().getCorrelationId(),
                    channel.getConnection().getAddress());
            /*try {
                // 拒绝消息,multiple=false,值拒绝当前的消息,requeue=true,重新放回队列
                // 一般不回放,不然会一致消费,记录日志查找原因
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }*/
        }
    }
}

1.5 测试

@RestController
public class MessageAckController {

    @Autowired
    private MessageAckProducer messageAckProducer;

    @GetMapping(value = "/messageAck")
    public void testMessageAck() {
        messageAckProducer.send();
    }

}
1604840762465.png

七、消息序列化

序列化配置:

@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(this.connectionFactory());
        // 生产者端发送消息,使用Jackson2JsonMessageConverter序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);

        // 消息确认, yml需要配置 publisher-confirms: true
        // 消息回调
        // ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调,即消息发送到exchange ack
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                logger.debug("消息发送到exchange成功!");
            } else {
                logger.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });

        // 消息返回, yml需要配置 publisher-returns: true
        // ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调,即消息发送不到任何一个队列中 ack
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            logger.debug("消息:{} 发送失败,应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        return rabbitTemplate;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 消费者端,接收消息使用Jackson2JsonMessageConverter反序列化
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

}

发送消息:

@RestController
public class MessageSerializableController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/messageSerializable/{name}")
    public void messageSerializable(@PathVariable(value = "name") String name) {
        User user = new User(name, 20, new Date());
        rabbitTemplate.convertAndSend(QueueConstant.QUEUE_NOTIFY_TEST, user);
    }

}

接收消息:

@Component
public class MessageSerilizableConsumer {

    /**
     * 使用@Payload获取body信息
     * @param user 直接反序列化的对象
     */
    @RabbitListener(queues = {QueueConstant.QUEUE_NOTIFY_TEST})
    public void receiveTestQueue(@Payload User user) {
        System.out.println(user.getName());
    }

}

测试:

1604844242748.png

八、Headers

Headers应用场景,比如对于headers中有某些属性的消息可以选择性处理,对应管理台的Headers:


1604845932396.png

生产者:

@RestController
public class HeadersController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final static MessagePostProcessor MESSAGE_POST_PROCESSOR = message -> {
        message.getMessageProperties().setContentType("application/json");
        message.getMessageProperties().setContentEncoding("UTF-8");
        // 设置Headers
        message.getMessageProperties().setHeader("name", "mitter");
        return message;
    };

    @GetMapping(value = "/headers/{name}")
    public void headers(@PathVariable(value = "name") String name) {
        User user = new User(name, 20, new Date());
        rabbitTemplate.convertAndSend(QueueConstant.QUEUE_NOTIFY_TEST, user, MESSAGE_POST_PROCESSOR);
    }

}

消费者:

@Component
public class MessageSerilizableConsumer {

    /**
     * 使用@Payload获取body信息,使用@Headers获取Headers信息
     * @param user 直接反序列化的对象
     */
    @RabbitListener(queues = {QueueConstant.QUEUE_NOTIFY_TEST})
    public void receiveTestQueue(@Payload User user, @Headers Map<String,Object> headers) {
        System.out.println(user.getName());
        System.out.println(JSON.toJSONString(headers));
    }

}

测试:

1604846139832.png 1604846196198.png
上一篇下一篇

猜你喜欢

热点阅读