【RabbitMQ-12】监听消息,处理业务逻辑源码分析
RabbitMQ是如何监听消息,执行业务逻辑?
【RabbitMQ-9】@RabbitListener注解生效的源码分析中分析了注解生效的原理,最终创建了一个消费者线程对象。
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
...
try {
initialize();
//消费者线程被启动后,会进行while轮询,监听本地阻塞队列的消息。
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
//消费消息(若监听方法抛出业务异常,不会抛出异常)
mainLoop();
}
//后续是消费者线程出现异常时的处理逻辑
} catch(InterruptedException e) {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
//发布事件,可以配置监听器来保持监听
publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
} ...
在mainLoop()
中,会消费消息,若业务方法抛出异常,那么异常会被包装为ListenerExecutionFailedException
类型的异常。
private void mainLoop() throws Exception { // NOSONAR Exception
try {
//处理消息,并且ACK或者UNACK
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
//判断是否需要动态扩容
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
//判断是否触发空闲事件的监听
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
} else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval && now > lastAlertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
} catch(ListenerExecutionFailedException ex) {
// 监听方法出现的异常,会被捕获且不抛出。
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
} catch(AmqpRejectAndDontRequeueException rejectEx) {
//异常被捕获且不抛出。
}
}
该类使用了MQ的事务包装。
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
...//事务处理,不关注
return doReceiveAndExecute(consumer);
}
自动ACK和UNACK是依赖consumer.commitIfNecessary(isChannelLocallyTransacted())
和consumer.rollbackOnExceptionIfNecessary(ex);
方法实现的。
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
Channel channel = consumer.getChannel();
for (int i = 0; i < this.txSize; i++) {
logger.trace("Waiting for message from consumer.");
//在本地队列中获取消息
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
//执行监听的业务逻辑
executeListener(channel, message);
} catch(ImmediateAcknowledgeAmqpException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery '" + e.getMessage() + "': " + message.getMessageProperties().getDeliveryTag());
}
break;
} catch(Exception ex) {
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery: " + message.getMessageProperties().getDeliveryTag());
}
break;
}
//事务的处理
...
else {
//自动确认若是出现异常,发送UNACK
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
}
//自动确认若是成功,发送ACK
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
该方法执行UNACK,但是channel.basicNack
可以判断是否重回队列。
public void rollbackOnExceptionIfNecessary(Throwable ex) {
//注:isAutoAck()=NONE,既不是NONE模式也不是MANUAL模式,返回true
boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
try {
if (this.transactional) {
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception: " + ex);
}
RabbitUtils.rollbackIfNecessary(this.channel);
}
if (ackRequired) {
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l - >l).max();
if (deliveryTag.isPresent()) {
//发送UNACK,是否重回队列由ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger)决定。
this.channel.basicNack(deliveryTag.getAsLong(), true, ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
}
if (this.transactional) {
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(this.channel);
}
}
} catch(Exception e) {
logger.error("Application exception overridden by rollback exception", ex);
throw RabbitExceptionTranslator.convertRabbitAccessException(e); // NOSONAR stack trace loss
} finally {
this.deliveryTags.clear();
}
}
若是业务方法抛出的是AmqpRejectAndDontRequeueException
的异常,那么在自动ACK的模式下,消息将被丢弃。
public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
//defaultRequeueRejected参数用户可以配置,其他的是判断异常是否是某个特殊异常的类型
boolean shouldRequeue = defaultRequeueRejected || throwable instanceof MessageRejectedWhileStoppingException || throwable instanceof ImmediateRequeueAmqpException;
Throwable t = throwable;
//shouldRequeue为true表示将要重回队列,且存在异常信息
while (shouldRequeue && t != null) {
//若是AmqpRejectAndDontRequeueException异常,那么不需要重回队列。
if (t instanceof AmqpRejectAndDontRequeueException) {
shouldRequeue = false;
}
t = t.getCause();
}
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
}
return shouldRequeue;
}
执行业务逻辑:
protected void executeListener(Channel channel, Message messageIn) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
}
throw new MessageRejectedWhileStoppingException();
}
try {
//调用业务方法
doExecuteListener(channel, messageIn);
} catch(RuntimeException ex) {
if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
if (this.statefulRetryFatalWithNullMessageId) {
throw new FatalListenerExecutionException("Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
} else {
throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID", new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex), messageIn);
}
}
//处理异常
handleListenerException(ex);
throw ex;
}
}
异常时如何被处理的呢:默认使用org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler#handleError
处理器。可以自定义配置errorHandler,即出现异常时,全局的进行配置。
private void doExecuteListener(Channel channel, Message messageIn) {
Message message = messageIn;
if (this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor: this.afterReceivePostProcessors) {
message = processor.postProcessMessage(message);
if (message == null) {
throw new ImmediateAcknowledgeAmqpException("Message Post Processor returned 'null', discarding message");
}
}
}
Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
//批量处理
else {
//调用监听方法
invokeListener(channel, message);
}
}
//调用监听方法时,实际是代理对象在执行。
protected void invokeListener(Channel channel, Message message) {
this.proxy.invokeListener(channel, message);
}
创建代理对象:
//初始化代理对象
protected void initializeProxy(Object delegate) {
if (getAdviceChain().length == 0) {
return;
}
ProxyFactory factory = new ProxyFactory();
//MQ的重试实现的原理就是加入了spring-retry的advice
for (Advice advice: getAdviceChain()) {
factory.addAdvisor(new DefaultPointcutAdvisor(advice));
}
factory.addInterface(ContainerDelegate.class);
factory.setTarget(delegate);
this.proxy = (ContainerDelegate) factory.getProxy(ContainerDelegate.class.getClassLoader());
}
实际执行的目标方法:
protected void actualInvokeListener(Channel channel, Message message) {
Object listener = getMessageListener();
//使用了此配置。
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
} else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
} finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
} else if (listener != null) {
throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: " + listener);
} else {
throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
}
}
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {
RabbitResourceHolder resourceHolder = null;
Channel channelToUse = channel;
boolean boundHere = false;
try {
...
//真正执行的方法
try {
listener.onMessage(message, channelToUse);
} catch(Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
}
} finally {
cleanUpAfterInvoke(resourceHolder, channelToUse, boundHere);
}
}
@RabbitListener
注解使用的listener类为:org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter#onMessage
在解析注解时,缓存了bean和method对象,后续通过反射调用业务逻辑。
我们也可以在自定义的SimpleMessageListenerContainer
配置setMessageListener
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new MySimpleMessageListenerContainer(connectionFactory);
//同时监听多个队列
container.setQueues(new Queue("kinson2"));
//设置当前的消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(2);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置监听外露
container.setExposeListenerChannel(true);
//设置线程池
container.setTaskExecutor(taskExecutor());
container.setErrorHandler((ex) - >{
//抛出异常后的后置处理器
});
container.setAdviceChain();
// container.setMessageConverter();
//设置消费端标签策略
// container.setConsumerTagStrategy(new ConsumerTagStrategy() {
// @Override
// public String createConsumerTag(String queue) {
// return queue + "_" + UUID.randomUUID().toString();
// }
// });
// //设置消息监听(手动的设置,最终依赖反射调用)
//传入的是策略,子类选择特定的子类来运行(监听容器配置的是他,某个队列被运行时,实际上运行的是对应监听容器的配置。)
container.setMessageListener(new ChannelAwareMessageListener() {@Override public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody(), "utf-8");
log.info("队列2—消费消息:" + msg);
}
});
return container;
}