JavaJava 程序员

rabbitmq确保消息不丢失后带来的那些坑,还是太年轻啊

2022-02-21  本文已影响0人  程序花生

前言

发送消息

 public Map<String, Object> sendMessage(Map<String, Object> params) throws UnsupportedEncodingException {
     Map<String, Object> resultMap = new HashMap<String, Object>(){
         {
             put("code", 200);
         }
     };
     String msg = "";
     Integer index = 0;
     if (params.containsKey("msg")) {
         msg = params.get("msg").toString();
     }
     if (params.containsKey("index")) {
         index = Integer.valueOf(params.get("index").toString());
     }
     if (index != 0) {
         //这里开始模拟异常出现。消息将会丢失
         int i = 1 / 0;
     }
     Map<String, Object> map = new HashMap<>();
     map.put("msg", msg);
     Message message= MessageBuilder.withBody(JSON.toJSONString(map).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON)
             .build();
     CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
     rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", message,data);
     return resultMap;
 }
 @RabbitListener(queues = RabbitConfig.QUEUEFIRST)
 @Async("asyncExecutor")
 public void handler(Message msg, Channel channel) {
     //channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
     byte[] body = msg.getBody();
     String messages = new String(body);
     JSONObject json = (JSONObject) JSONObject.parse(messages);
     if ("1".equals(json.getString("msg"))) {
         try {
             channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
     if ("2".equals(json.getString("msg"))) {
         throw new RuntimeException("异常。。。。。");
     }
     log.info(RabbitConfig.QUEUEFIRST+"队列中消费的信息:"+msg);
 }

场景描述

问题剖析

 Unexpected exception occurred invoking async method: public void xxxxxxxxxxxxxxxx(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)
 java.lang.IllegalStateException: Channel closed; cannot ack/nack

mq批处理设置

消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。

然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息。在这种场景下,我们可以通过设置basic.qos信令中的prefetch_count来达到这种效果

解决办法

总结

原文链接:
https://juejin.cn/post/7056574438849904654

上一篇 下一篇

猜你喜欢

热点阅读