RabbitMQ使用的一些问题

2019-08-23  本文已影响0人  黄金矿工00七

先说点闲话,这个问题的发生是因为当年我的垃圾代码导致,嗯,垃圾到我自己不想看。刚毕业的年轻人总是想探索一下未知,于是一知半解之下就上了MQ,但是话说回来,这种自讨苦吃是要的,不然真的会一直垃圾下去。

问题的发生

项目中的消息推送使用MQ做了异步处理,有一天消息推送突然中断了,排查了好久,日志也没有,这个也是,嗯,知识太少。好久之后,我想不会是MQ出问题了吧,我一看MQ,果然,这里不是原图,Unacknowledged状态的有8,Ready的有好多,再打开消费端的配置一看,

MQ管理
消费端配置 ,qos设置正好为8,通过以上排查基本可以确定队列堵塞是由于消费者线程取走了消息,但是既没有ACK,也没有NACK,这样的消息个数到达Qos设置的值后,队列就会堵塞。
@Component
public class MsgQueueListener extends MessageListenerAdapter {

  private static Logger logger = LoggerFactory.getLogger(MsgQueueListener.class);
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Autowired
  private MessageSendFacadeService messageSendFacadeService;
  @Autowired
  private TaskExecutor taskExecutor;
  @Autowired
  private RedisUtil redisUtil;

  @Override
  public void onMessage(final Message message, final Channel channel) throws IOException {
    final WechatTemMessageDTO dto = (WechatTemMessageDTO) rabbitTemplate.getMessageConverter()
        .fromMessage(message);
    try {
     //业务代码处理
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (AnswerException e) {
      if (getRetryCount(message.getMessageProperties() < 3) {
        dto.setErrMsg(e.getErrorMsg());
        //重试次数小于3 ,投递到重试队列
        rabbitTemplate.convertAndSend("*", "*",
            dto, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = new MessageProperties();
                properties.setHeader("x-orig-routing-key", "*");
                return message;
              }
            });
      } else {
        dto.setErrMsg(e.getErrorMsg());
        rabbitTemplate.convertAndSend("*failed", "*failed", dto,
            new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = new MessageProperties();
                properties.setHeader("x-orig-routing-key", "*");
                return message;
              }
            });
      }
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  }

问题出在catch块,

  1. catch只捕获了业务异常
    对于非业务异常,显然无法捕获,导致消费者没有ack,这段代码主要的原因再调用业务代码的时候,我尽可能的将业务代码捕获的异常转换为了业务异常,但是产生了遗漏,而且这里这样做也有很大的坏处,没有将异常分类,来决定失败消息如何处理,下面再细说。
  2. catch块中可能又会抛出异常
    catch块中抛出的异常显然导致消费者没有ack,也没有finally进行处理,所以消费者慢慢的阻塞。
    解决办法应该是在finally语句中来执行这些操作,消费者从队列中取出消息后,无非是三种处理结果:1、处理成功,这种时候应该用basicAck确认消息;2、可重试的处理失败,这时候应该用basicNack将消息重新入列或者丢入死信队列3、不可重试的处理失败,这时候应该使用basicNack将消息丢弃或者丢入失败队列进行相应的业务操作
enum ProcessResult{
//这里只举几个简单的例子
  SUCCESS,  // 处理成功
  RETRY,   // 可以重试的错误
  FAIL,  // 无法重试的错误
}
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        WechatMessageDTO messageDTO = null;
        try {
            messageDTO = rabbitMQService.getMessageBody(message);
        }
        catch (Exception e) {
            logger.error("MQ 数据转换异常",e);
        } finally {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
        ProcessResult result=null;
        try {
            result = messageService.processMsg(messageDTO);
        } catch(UserDefineException e){
          logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
          //根据业务异常类型进行处理
          result = ;
        }catch (Exception e) {
            logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
            result = ;
        } finally {
            postProcessByResult(result);
        }
    }

容易出问题的点

一些消息可靠性保证措施

对于生产者:

上一篇 下一篇

猜你喜欢

热点阅读