中间件

【RabbitMQ-12】监听消息,处理业务逻辑源码分析

2020-11-24  本文已影响0人  小胖学编程

RabbitMQ是如何监听消息,执行业务逻辑?

【RabbitMQ-9】@RabbitListener注解生效的源码分析中分析了注解生效的原理,最终创建了一个消费者线程对象。

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    ...
    try {
        initialize();
        //消费者线程被启动后,会进行while轮询,监听本地阻塞队列的消息。
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            //消费消息(若监听方法抛出业务异常,不会抛出异常)
            mainLoop();
        }
    //后续是消费者线程出现异常时的处理逻辑
    } catch(InterruptedException e) {
        logger.debug("Consumer thread interrupted, processing stopped.");
        Thread.currentThread().interrupt();
        aborted = true;
        //发布事件,可以配置监听器来保持监听
        publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
    } ...

mainLoop()中,会消费消息,若业务方法抛出异常,那么异常会被包装为ListenerExecutionFailedException类型的异常。

private void mainLoop() throws Exception { // NOSONAR Exception
    try {
        //处理消息,并且ACK或者UNACK
        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
        //判断是否需要动态扩容
        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
            checkAdjust(receivedOk);
        }
        //判断是否触发空闲事件的监听
        long idleEventInterval = getIdleEventInterval();
        if (idleEventInterval > 0) {
            if (receivedOk) {
                updateLastReceive();
            } else {
                long now = System.currentTimeMillis();
                long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
                long lastReceive = getLastReceive();
                if (now > lastReceive + idleEventInterval && now > lastAlertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {
                    publishIdleContainerEvent(now - lastReceive);
                }
            }
        }
    } catch(ListenerExecutionFailedException ex) {
        // 监听方法出现的异常,会被捕获且不抛出。
        if (ex.getCause() instanceof NoSuchMethodException) {
            throw new FatalListenerExecutionException("Invalid listener", ex);
        }
    } catch(AmqpRejectAndDontRequeueException rejectEx) {
       //异常被捕获且不抛出。
    }
}

该类使用了MQ的事务包装。

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
    ...//事务处理,不关注  
    return doReceiveAndExecute(consumer);
}

自动ACK和UNACK是依赖consumer.commitIfNecessary(isChannelLocallyTransacted())consumer.rollbackOnExceptionIfNecessary(ex);方法实现的。

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
    Channel channel = consumer.getChannel();

    for (int i = 0; i < this.txSize; i++) {
        logger.trace("Waiting for message from consumer.");
        //在本地队列中获取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (message == null) {
            break;
        }
        try {
            //执行监听的业务逻辑
            executeListener(channel, message);
        } catch(ImmediateAcknowledgeAmqpException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("User requested ack for failed delivery '" + e.getMessage() + "': " + message.getMessageProperties().getDeliveryTag());
            }
            break;
        } catch(Exception ex) {
            if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery: " + message.getMessageProperties().getDeliveryTag());
                }
                break;
            }
          //事务的处理
              ...
              else {
                //自动确认若是出现异常,发送UNACK
                consumer.rollbackOnExceptionIfNecessary(ex);
                throw ex;
            }
        }
    }
    //自动确认若是成功,发送ACK
    return consumer.commitIfNecessary(isChannelLocallyTransacted());
}

该方法执行UNACK,但是channel.basicNack可以判断是否重回队列。

public void rollbackOnExceptionIfNecessary(Throwable ex) {
    //注:isAutoAck()=NONE,既不是NONE模式也不是MANUAL模式,返回true
    boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
    try {
        if (this.transactional) {
            if (logger.isDebugEnabled()) {
                logger.debug("Initiating transaction rollback on application exception: " + ex);
            }
            RabbitUtils.rollbackIfNecessary(this.channel);
        }
        if (ackRequired) {
            OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l - >l).max();
            if (deliveryTag.isPresent()) {
                //发送UNACK,是否重回队列由ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger)决定。
                this.channel.basicNack(deliveryTag.getAsLong(), true, ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
            }
            if (this.transactional) {
                // Need to commit the reject (=nack)
                RabbitUtils.commitIfNecessary(this.channel);
            }
        }
    } catch(Exception e) {
        logger.error("Application exception overridden by rollback exception", ex);
        throw RabbitExceptionTranslator.convertRabbitAccessException(e); // NOSONAR stack trace loss
    } finally {
        this.deliveryTags.clear();
    }
}

若是业务方法抛出的是AmqpRejectAndDontRequeueException的异常,那么在自动ACK的模式下,消息将被丢弃。

public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
   //defaultRequeueRejected参数用户可以配置,其他的是判断异常是否是某个特殊异常的类型
    boolean shouldRequeue = defaultRequeueRejected || throwable instanceof MessageRejectedWhileStoppingException || throwable instanceof ImmediateRequeueAmqpException;
    Throwable t = throwable;
    //shouldRequeue为true表示将要重回队列,且存在异常信息
    while (shouldRequeue && t != null) {
        //若是AmqpRejectAndDontRequeueException异常,那么不需要重回队列。
        if (t instanceof AmqpRejectAndDontRequeueException) {
            shouldRequeue = false;
        }
        t = t.getCause();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
    }
    return shouldRequeue;
}

执行业务逻辑:

protected void executeListener(Channel channel, Message messageIn) {
    if (!isRunning()) {
        if (logger.isWarnEnabled()) {
            logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
        }
        throw new MessageRejectedWhileStoppingException();
    }
    try {
        //调用业务方法
        doExecuteListener(channel, messageIn);
    } catch(RuntimeException ex) {
        if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
            if (this.statefulRetryFatalWithNullMessageId) {
                throw new FatalListenerExecutionException("Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
            } else {
                throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID", new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex), messageIn);
            }
        }
        //处理异常
        handleListenerException(ex);
        throw ex;
    }
}

异常时如何被处理的呢:默认使用org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler#handleError处理器。可以自定义配置errorHandler,即出现异常时,全局的进行配置。

private void doExecuteListener(Channel channel, Message messageIn) {
    Message message = messageIn;
    if (this.afterReceivePostProcessors != null) {
        for (MessagePostProcessor processor: this.afterReceivePostProcessors) {
            message = processor.postProcessMessage(message);
            if (message == null) {
                throw new ImmediateAcknowledgeAmqpException("Message Post Processor returned 'null', discarding message");
            }
        }
    }
    Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
     //批量处理
     else {
        //调用监听方法
        invokeListener(channel, message);
    }
}
 //调用监听方法时,实际是代理对象在执行。
protected void invokeListener(Channel channel, Message message) {
    this.proxy.invokeListener(channel, message);
}

创建代理对象:

//初始化代理对象
protected void initializeProxy(Object delegate) {
    if (getAdviceChain().length == 0) {
        return;
    }
    ProxyFactory factory = new ProxyFactory();
    //MQ的重试实现的原理就是加入了spring-retry的advice
    for (Advice advice: getAdviceChain()) {
        factory.addAdvisor(new DefaultPointcutAdvisor(advice));
    }
    factory.addInterface(ContainerDelegate.class);
    factory.setTarget(delegate);
    this.proxy = (ContainerDelegate) factory.getProxy(ContainerDelegate.class.getClassLoader());
}

实际执行的目标方法:

protected void actualInvokeListener(Channel channel, Message message) {
    Object listener = getMessageListener();
    //使用了此配置。
    if (listener instanceof ChannelAwareMessageListener) {
        doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
    } else if (listener instanceof MessageListener) {
        boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
        if (bindChannel) {
            RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
            resourceHolder.setSynchronizedWithTransaction(true);
            TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), resourceHolder);
        }
        try {
            doInvokeListener((MessageListener) listener, message);
        } finally {
            if (bindChannel) {
                // unbind if we bound
                TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
            }
        }
    } else if (listener != null) {
        throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: " + listener);
    } else {
        throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
    }
}
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {

    RabbitResourceHolder resourceHolder = null;
    Channel channelToUse = channel;
    boolean boundHere = false;
    try {
        ...
        //真正执行的方法
        try {
            listener.onMessage(message, channelToUse);
        } catch(Exception e) {
            throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
        }
    } finally {
        cleanUpAfterInvoke(resourceHolder, channelToUse, boundHere);
    }
}

@RabbitListener注解使用的listener类为:org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter#onMessage在解析注解时,缓存了bean和method对象,后续通过反射调用业务逻辑。

我们也可以在自定义的SimpleMessageListenerContainer配置setMessageListener

@Bean 
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new MySimpleMessageListenerContainer(connectionFactory);
    //同时监听多个队列
    container.setQueues(new Queue("kinson2"));
    //设置当前的消费者数量
    container.setConcurrentConsumers(1);
    container.setMaxConcurrentConsumers(2);
    //设置是否重回队列
    container.setDefaultRequeueRejected(false);
    //设置自动签收
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //设置监听外露
    container.setExposeListenerChannel(true);
    //设置线程池
    container.setTaskExecutor(taskExecutor());
    container.setErrorHandler((ex) - >{
        //抛出异常后的后置处理器
    });
    container.setAdviceChain();
    //        container.setMessageConverter();
    //设置消费端标签策略
    //        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
    //            @Override
    //            public String createConsumerTag(String queue) {
    //                return queue + "_" + UUID.randomUUID().toString();
    //            }
    //        });
    //      //设置消息监听(手动的设置,最终依赖反射调用)
    //传入的是策略,子类选择特定的子类来运行(监听容器配置的是他,某个队列被运行时,实际上运行的是对应监听容器的配置。)
    container.setMessageListener(new ChannelAwareMessageListener() {@Override public void onMessage(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody(), "utf-8");
            log.info("队列2—消费消息:" + msg);
        }
    });
    return container;
}
上一篇下一篇

猜你喜欢

热点阅读