【RabbitMQ-13】惊!线上的RabbitMQ消费者自己k

2021-08-10  本文已影响0人  小胖学编程

在很早之前,项目出现了一个问题,即有一个消费队列。其中数据处理的过程中出现了OOM,然后导致了RabbitMQ消费者全部挂掉,最终导致消息大量堆积。

在排查这个问题的时候,我深入了解了下RabbitMq源码,最终定位并解决了这个问题。

版本信息:SpringBoot 2.0.4.RELEASE
因为SpringBoot版本不同,RabbitMq源码有一些改动。所以要确定版本号。

1. 问题定位

某个消息出现error级别的异常后,对应的消费者会关闭,且返回unack。消息会回到Mq并转发给下一个消费者去消费。

当然,这个消息在下一个消费者中很大程度上也会出现error异常。故一个error级别的异常,会将整个队列所有的消息者全关闭

1.1 源码分析

每创建一个消息者,则在线程池中获取一个线程。去执行AsyncMessageProcessingConsumer

创建消费者.png

然后消费者将“死循环”式的监听消费消息。
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer

消息队列是死循环去消费数据.png

当出现error异常时,如代码所示,会stop掉消费者。

stop消费者.png

2. 解决方案

2.1 方式一:监听事件,重启消费者

在上图可知,当消费者被终止后,会发送Event事件,那么监听事件后重启消费者即可。

@Slf4j
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {

    /**
     * 重启消费者失败的消费者方法
     */
    private Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer;

    /**
     * 开启自动重启的回调方法
     * true:表示将要重启消费者
     */
    private Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction;


    public void setRestartConsumerFailEventConsumer(Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer) {
        this.restartConsumerFailEventConsumer = restartConsumerFailEventConsumer;
    }

    public void setFailedEventListenerBooleanFunction(Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction) {
        this.failedEventListenerBooleanFunction = failedEventListenerBooleanFunction;
    }

    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        log.error("消费者失败事件发生:{}", event);
        //判断是否需要进行重试
        Boolean restart = false;
        if (failedEventListenerBooleanFunction != null) {
            restart = failedEventListenerBooleanFunction.apply(event);
        }
        if (restart) {
            log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
                    event.getReason()), event.getThrowable());
            //获取到容器
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());
            // 重启
            try {
                restart(container);
                log.info("重启队列{}的监听成功!", queueNames);
            } catch (Exception e) {
                log.error(String.format("重启队列%s的监听失败!", queueNames), e);
                //发布事件
                if (restartConsumerFailEventConsumer != null) {
                    RestartConsumerFailEvent restartConsumerFailEvent = new RestartConsumerFailEvent(event.getSource());
                    restartConsumerFailEvent.setThrowable(e);
                    restartConsumerFailEventConsumer.accept(restartConsumerFailEvent);
                }
            }
        }

    }

    /**
     * 重启消费者
     *
     * @param container 容器对象
     */
    private void restart(SimpleMessageListenerContainer container) {
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            log.error("", e);
        }
        Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
        //重启
        container.start();
    }

}

缺点是:触发error异常的消息依旧没有被消费掉,依旧可能会将重启的消费者给kill掉。

2.2 方式二:捕获Error异常

首先实现拦截器,当出现Error异常后,捕获处理异常,并向外抛出事件通知。

@Slf4j
public class ErrorHandlerInterceptor implements MethodInterceptor, Serializable {

    private ApplicationContext applicationContext;

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    /**
     * 当抛出Error异常后,处理方案。
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Object proceed = null;
        try {
            proceed = invocation.proceed();
        } catch (Error e) {
            log.error("", e);
            //遇到Error异常后的处理方案
            MessageErrorEvent messageErrorEvent = new MessageErrorEvent(applicationContext, RabbitUtil.getMessage(invocation), e);
            applicationContext.publishEvent(messageErrorEvent);
        }
        return proceed;
    }
}

事件信息:

public class MessageErrorEvent extends ApplicationContextEvent {

    /**
     * 消息对象
     */
    private Message message;

    /**
     * Error异常信息
     */
    private Error error;

    /**
     * Create a new ContextStartedEvent.
     *
     * @param source the {@code ApplicationContext} that the event is raised for
     *               (must not be {@code null})
     */
    public MessageErrorEvent(ApplicationContext source) {
        super(source);
    }

    public MessageErrorEvent(ApplicationContext source, Message message, Error error) {
        super(source);
        this.message = message;
        this.error = error;
    }

    public Message getMessage() {
        return message;
    }

    public void setMessage(Message message) {
        this.message = message;
    }

    public Error getError() {
        return error;
    }

    public void setError(Error error) {
        this.error = error;
    }
}

关键点:将拦截器设置到SimpleRabbitListenerContainerFactory中:

@EnableRabbit
@Configuration
public class RabbitConfiguration {

    @Autowired
    private ObjectProvider<ErrorHandlerInterceptor> errorHandlerInterceptorObjectProvider;

    @Bean(name = "sealListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory(CachingConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(250);
        /* 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 */
        factory.setDefaultRequeueRejected(true);
        //慢消息触发事件通知
        List<Advice> adviceList = new ArrayList<>();
  
        ErrorHandlerInterceptor errorHandlerInterceptor = errorHandlerInterceptorObjectProvider.getIfAvailable();
        if (errorHandlerInterceptor != null) {
            adviceList.add(errorHandlerInterceptor);
        }
        //加入拦截器配置
        List<MessageInterceptor> messageInterceptors = messageInterceptorsObjectProvider.getIfAvailable();
        if (!CollectionUtils.isEmpty(messageInterceptors)) {
            for (MessageInterceptor messageInterceptor : messageInterceptors) {
                if (messageInterceptor != null) {
                    adviceList.add(messageInterceptor);
                }
            }
        }
        factory.setAdviceChain(adviceList.toArray(new Advice[adviceList.size()]));
        //自动确认
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
}

由此,可以将通过责任链的方式,进行AOP处理,使得消费者不对外抛出Error级别的异常。

上一篇下一篇

猜你喜欢

热点阅读