Spring资料整理MQ(消息队列)资料整合程序员

Spring整合rabbitmq实践(三):源码

2018-10-18  本文已影响0人  jinchaolv

Spring整合rabbitmq实践(一):基础
Spring整合rabbitmq实践(二):扩展

4. 源码解析

4.1. 通过RabbitTemplate获取消息

从RabbitTemplate中只有queueName入参的方法开始:

    @Override
    public Message receive(String queueName) {
        if (this.receiveTimeout == 0) {
            return doReceiveNoWait(queueName);
        }
        else {
            return receive(queueName, this.receiveTimeout);
        }
    }

receiveTimeOut参数为0,直接获取消息,不等待,获取不到返回null;否则会等待一段时间。

进入带有receiveTimeout的receive方法:


    @Override
    public Message receive(final String queueName, final long timeoutMillis) {
        Message message = execute(channel -> {
            Delivery delivery = consumeDelivery(channel, queueName, timeoutMillis);
            if (delivery == null) {
                return null;
            }
            else {
                if (isChannelLocallyTransacted(channel)) {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    channel.txCommit();
                }
                else if (isChannelTransacted()) {
                    ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel,
                            delivery.getEnvelope().getDeliveryTag());
                }
                else {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
                return buildMessageFromDelivery(delivery);
            }
        });
        logReceived(message);
        return message;
    }

看到Message是通过调用execute方法得到的,进到execute方法:


    @Override
    public <T> T execute(ChannelCallback<T> action) {
        return execute(action, getConnectionFactory());
    }

    @SuppressWarnings("unchecked")
    private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
        if (this.retryTemplate != null) {
            try {
                return this.retryTemplate.execute(
                        (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
                        (RecoveryCallback<T>) this.recoveryCallback);
            }
            catch (Exception e) {
                if (e instanceof RuntimeException) {
                    throw (RuntimeException) e;
                }
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            }
        }
        else {
            return doExecute(action, connectionFactory);
        }
    }

这里能看到配置RetryTemplate的作用,具体就不管了,找到doExecute方法,Message是从这里得到的:


   private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
    
        ...
        
        if (channel == null) {
            if (isChannelTransacted()) {
                resourceHolder = ConnectionFactoryUtils.
                    getTransactionalResourceHolder(connectionFactory, true, this.usePublisherConnection);
                channel = resourceHolder.getChannel();
                if (channel == null) {
                    ConnectionFactoryUtils.releaseResources(resourceHolder);
                    throw new IllegalStateException("Resource holder returned a null channel");
                }
            }
            else {
                connection = ConnectionFactoryUtils.createConnection(connectionFactory,
                        this.usePublisherConnection); // NOSONAR - RabbitUtils closes
                if (connection == null) {
                    throw new IllegalStateException("Connection factory returned a null connection");
                }
                try {
                    channel = connection.createChannel(false);
                    if (channel == null) {
                        throw new IllegalStateException("Connection returned a null channel");
                    }
                }
                catch (RuntimeException e) {
                    RabbitUtils.closeConnection(connection);
                    throw e;
                }
            }
        }
        
        ...
        
        try {
            ...
            return action.doInRabbit(channel);
        }
        catch (Exception ex) {
            ...
        }
        finally {
            if (!invokeScope) {
                if (resourceHolder != null) {
                    ConnectionFactoryUtils.releaseResources(resourceHolder);
                }
                else {
                    RabbitUtils.closeChannel(channel);
                    RabbitUtils.closeConnection(connection);
                }
            }
        }
    }

这个方法比较长,大体可以了解到,在这个方法里创建了Connection和Channel,执行action.doInRabbit()方法得到Message,关闭Channel和Connection。

当然,这里Connection和Channel的创建和关闭都不一定是真的创建和关闭,与具体的实现有关,比如CachingConnectionFactory,它的实现就是有缓存的,后面详述。

action.doInRabbit()方法的实现逻辑就要再回到上面的receive方法,这里的action就是在那个receive方法传入的一个ChannelCallback的匿名内部实现类。

可以看到最后返回的消息是从Delivery中得到的,那么看下Delivery是怎么来的:


    private Delivery consumeDelivery(Channel channel, String queueName, long timeoutMillis) throws Exception {
        Delivery delivery = null;
        RuntimeException exception = null;
        CompletableFuture<Delivery> future = new CompletableFuture<>();
        DefaultConsumer consumer = createConsumer(queueName, channel, future,
                timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis);
        try {
            if (timeoutMillis < 0) {
                delivery = future.get();
            }
            else {
                delivery = future.get(timeoutMillis, TimeUnit.MILLISECONDS);
            }
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            this.logger.error("Consumer failed to receive message: " + consumer, cause);
            exception = RabbitExceptionTranslator.convertRabbitAccessException(cause);
            throw exception;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (TimeoutException e) {
            // no result in time
        }
        finally {
            if (exception == null || !(exception instanceof ConsumerCancelledException)) {
                cancelConsumerQuietly(channel, consumer);
            }
        }
        return delivery;
    }

看到future.get(),显然这是一个阻塞式的等待返回结果,receive方法中传入的receiveTimeout参数也正是在这里用到的。那么future数据自然是在createConsumer()方法中产生的:


    private DefaultConsumer createConsumer(final String queueName, Channel channel,
            CompletableFuture<Delivery> future, long timeoutMillis) throws Exception {
        channel.basicQos(1);
        final CountDownLatch latch = new CountDownLatch(1);
        DefaultConsumer consumer = new TemplateConsumer(channel) {

            @Override
            public void handleCancel(String consumerTag) throws IOException {
                future.completeExceptionally(new ConsumerCancelledException());
            }

            @Override
            public void handleConsumeOk(String consumerTag) {
                super.handleConsumeOk(consumerTag);
                latch.countDown();
            }

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                future.complete(new Delivery(consumerTag, envelope, properties, body));
            }

        };
        channel.basicConsume(queueName, consumer);
        if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
            if (channel instanceof ChannelProxy) {
                ((ChannelProxy) channel).getTargetChannel().close();
            }
            future.completeExceptionally(
                    new ConsumeOkNotReceivedException("Blocking receive, consumer failed to consume: " + consumer));
        }
        return consumer;
    }

channel.basicConsume(queueName, consumer)是rabbitmq的api。

4.2. @RabbitListener注解的作用过程


    @RabbitListener(queues = "test_queue_delay")

    public void listen(Message message){

        ...

    }

我们可以通过注解的方式方便地接收消息,那么为什么一个普通的方法加上@RabbitListener注解就能接收消息了呢?下面进入源码看一下大体流程。

4.2.1. 实现流程

进入@RabbitListener注解源码,有一段注释说明了这个注解是怎么被处理的:

Processing of {@code @RabbitListener} annotations is performed by registering a
{@link RabbitListenerAnnotationBeanPostProcessor}. This can be done manually or, more
conveniently, through the {@code <rabbit:annotation-driven/>} element or
{@link EnableRabbit} annotation.

于是找到RabbitListenerAnnotationBeanPostProcessor,


    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
        for (ListenerMethod lm : metadata.listenerMethods) {
            for (RabbitListener rabbitListener : lm.annotations) {
                processAmqpListener(rabbitListener, lm.method, bean, beanName);
            }
        }
        if (metadata.handlerMethods.length > 0) {
            processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
        }
        return bean;
    }

它实现了postProcessAfterInitialization方法,当bean初始化完成后,在这里会获取到这个bean的类用户自己定义的所有添加了@RabbitListener注解的方法,然后调用processAmqpListener()方法对这些方法进行处理,实际上是对方法上的@RabbitListener进行处理,一个方法上可以有多个@RabbitListener,会处理多次。

获取@RabbitListener注解方法的具体过程看buildMetadata()方法:


    private TypeMetadata buildMetadata(Class<?> targetClass) {
        Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
        final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
        final List<ListenerMethod> methods = new ArrayList<>();
        final List<Method> multiMethods = new ArrayList<>();
        ReflectionUtils.doWithMethods(targetClass, method -> {
            Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
            if (listenerAnnotations.size() > 0) {
                methods.add(new ListenerMethod(method,
                        listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
            }
            if (hasClassLevelListeners) {
                RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
                if (rabbitHandler != null) {
                    multiMethods.add(method);
                }
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        if (methods.isEmpty() && multiMethods.isEmpty()) {
            return TypeMetadata.EMPTY;
        }
        return new TypeMetadata(
                methods.toArray(new ListenerMethod[methods.size()]),
                multiMethods.toArray(new Method[multiMethods.size()]),
                classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
    }

在这个方法里面,找出了所有加了@RabbitListener注解的方法。

其实通过这个方法可以看到@RabbitListener的另一种使用方式,可以在类上加@RabbitListener注解,然后在方法上加@RabbitHandler注解,如果采用这种方式会processMultiMethodListeners()方法来处理这些方法。

这里我们只看processAmqpListener()方法,看它是怎么处理上面找到的这些方法的,


   protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
        Method methodToUse = checkProxy(method, bean);
        MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
        endpoint.setMethod(methodToUse);
        processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
    }

将每一个加@RabbitListener注解的方法构造一个MethodRabbitListenerEndpoint,然后调用processListener()方法:


    protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
            Object adminTarget, String beanName) {
        ...
        
        RabbitListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
                        containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        this.registrar.registerEndpoint(endpoint, factory);

这个方法很长,前面省略的部分是读取@RabbitListener注解中的值,设置到endpoint中去。直接看最后一段,endpoint的属性都设置完了之后,获取我们配置的RabbitListenerContainerFactory bean,然后调用RabbitListenerEndpointRegistrar类的registerEndpoint()方法:


    public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must be set");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");
        // Factory may be null, we defer the resolution right before actually creating the container
        AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
            if (this.startImmediately) { // Register and start immediately
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }

这里根据startImmediately看是否需要立刻注册endpoint,或者先将其添加到一个List,稍后统一注册。

对于统一注册的实现,RabbitListenerAnnotationBeanPostProcessor类除了实现BeanPostProcessor以外,还实现了SmartInitializingSingleton接口,所以当RabbitListenerAnnotationBeanPostProcessor这个bean实例化完成之后会调用它的afterSingletonsInstantiated()方法


    @Override
    public void afterSingletonsInstantiated() {
        
        ...

        // Actually register all listeners
        this.registrar.afterPropertiesSet();

        ...
    }

因为之前已经将所有的endpoint添加到了RabbitListenerEndpointRegistrar类中的一个List中了,所以这里调用RabbitListenerEndpointRegistrar类的afterPropertiesSet()方法进行统一注册:

    @Override
    public void afterPropertiesSet() {
        registerAllEndpoints();
    }

不管是单独注册endpoint还是统一注册,调用的是同样的方法,所以看统一注册的registerAllEndpoints()方法就行了:

    protected void registerAllEndpoints() {
        synchronized (this.endpointDescriptors) {
            for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
                this.endpointRegistry.registerListenerContainer(
                        descriptor.endpoint, resolveContainerFactory(descriptor));
            }
            this.startImmediately = true;  // trigger immediate startup
        }
    }

这里就是一个简单的for循环,一个一个注册,具体是怎么注册的,再跟踪registerListenerContainer()方法,直到进入下面这个方法:


    public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                                          boolean startImmediately) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");

        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must not be empty");
        synchronized (this.listenerContainers) {
            Assert.state(!this.listenerContainers.containsKey(id),
                    "Another endpoint is already registered with id '" + id + "'");
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
            this.listenerContainers.put(id, container);
            if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                List<MessageListenerContainer> containerGroup;
                if (this.applicationContext.containsBean(endpoint.getGroup())) {
                    containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
                }
                else {
                    containerGroup = new ArrayList<MessageListenerContainer>();
                    this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                }
                containerGroup.add(container);
            }
            if (startImmediately) {
                startIfNecessary(container);
            }
        }
    }

可见,注册endpoint,实际上就是RabbitListenerContainerFactory将每一个endpoint都创建成MessageListenerContainer(具体创建过程,由RabbitListenerContainerFactory类自己去完成),然后根据startImmediately参数判断是否调用startIfNecessary()方法立即启动MessageListenerContainer。

实际接收消息是由这个MessageListenerContainer来做的,而MessageListenerContainer接口中有一个接口方法来设置MessageListener,


    /**
     * Setup the message listener to use. Throws an {@link IllegalArgumentException}
     * if that message listener type is not supported.
     * @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
     */
    void setupMessageListener(Object messageListener);

MessageListener将会调用我们加了@RabbitListener注解的方法处理消息,


/**
 * Listener interface to receive asynchronous delivery of Amqp Messages.
 *
 * @author Mark Pollack
 * @author Gary Russell
 */
@FunctionalInterface
public interface MessageListener {

    void onMessage(Message message);

}

或者是ChannelAwareMessageListener接口类来调用我们的方法,


/**
 * A message listener that is aware of the Channel on which the message was received.
 *
 * @author Mark Pollack
 * @author Gary Russell
 */
@FunctionalInterface
public interface ChannelAwareMessageListener {

    /**
     * Callback for processing a received Rabbit message.
     * <p>Implementors are supposed to process the given Message,
     * typically sending reply messages through the given Session.
     * @param message the received AMQP message (never <code>null</code>)
     * @param channel the underlying Rabbit Channel (never <code>null</code>)
     * @throws Exception Any.
     */
    void onMessage(Message message, Channel channel) throws Exception;
}

这样接收并处理消息的所有工作就完成了。

如果不立即启动MessageListenerContainer,RabbitListenerEndpointRegistry也实现了SmartLifecycle接口,所以在spring context refresh的最后一步会去调用start()方法:

    @Override
    public void start() {
        for (MessageListenerContainer listenerContainer : getListenerContainers()) {
            startIfNecessary(listenerContainer);
        }
    }

可以看到在这里统一启动了所有的MessageListenerContainer,

    private void startIfNecessary(MessageListenerContainer listenerContainer) {
        if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
            listenerContainer.start();
        }
    }

所谓启动MessageListenerContainer其实就是调用MessageListenerContainer的start()方法。这也是SmartLifecycle的一个接口方法,它的实现必须保证调用了这个start()方法之后MessageListenerContainer将能够接受到消息。

所以对@RabbitListener注解的整个处理流程就是这样。

总结一下整个实现流程:

(1).@RabbitListener注解的方法所在的类首先是一个bean,因此,实现BeanPostProcessor接口对每一个初始化完成的bean进行处理。

(2).遍历bean中由用户自己定义的所有的方法,找出其中添加了@RabbitListener注解的方法(也可以是@RabbitHandler注解,上面已经讲了,不再赘述)。

(3).读取上面找出的所有方法上@RabbitListener注解中的值,并为每一个方法创建一个RabbitListenerEndpoint,保存在RabbitListenerEndpointRegistrar类中。

(4).在所有的bean都初始化完成,即所有@RabbitListener注解的方法都创建了endpoint之后,由我们配置的RabbitListenerContainerFactory将每个endpoint创建MessageListenerContainer。

(5).最后启动上面创建的MessageListenerContainer。

(6).至此,全部完成,MessageListenerContainer启动后将能够接受到消息,再将消息交给它的MessageListener处理消息。

下面还剩下几件事情才能真正实现上面的步骤:

(1).RabbitListenerContainerFactory只是个接口,它不会自己创建MessageListenerContainer,所以需要一个RabbitListenerContainerFactory实现类,它必须能创建MessageListenerContainer。

(2).MessageListenerContainer也只是一个接口,它不会自己接收消息,所以需要一个MessageListenerContainer实现类,它必须做到在启动后能够接收消息,同时它必须能设置MessageListener,用以处理消息。

(3).MessageListener(或ChannelAwareMessageListener)也只是一个接口,所以还需要一个MessageListener实现类,它必须能调用我们加了@RabbitListener注解的方法。

4.2.2. SimpleRabbitListenerContainerFactory的实现

对应上面的那三个步骤,看它是怎么实现的。

MessageListenerContainer的创建

首先看AbstractRabbitListenerContainerFactory抽象类的下面这个方法:


    @Override
    public C createListenerContainer(RabbitListenerEndpoint endpoint) {
        C instance = createContainerInstance();
        ...
        
        endpoint.setupListenerContainer(instance);
        ...
        
        initializeContainer(instance, endpoint);

        return instance;
    }

注意里面两个方法,后面这个方法里面SimpleRabbitListenerContainerFactory会做一些它独有的属性设置,前一个方法执行结束,MessageListener就设置到MessageListenerContainer里面去了,可以跟踪这个方法,一直到AbstractRabbitListenerEndpoint类的下面这个方法:


    private void setupMessageListener(MessageListenerContainer container) {
        MessageListener messageListener = createMessageListener(container);
        Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
        container.setupMessageListener(messageListener);
    }

可以看到在这个方法里创建了MessageListener,并将其设置到MessageListenerContainer里面去。

createMessageListener()方法有两个实现,实际调用的是MethodRabbitListenerEndpoint类里面的实现:


    @Override
    protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
        ...
        
        MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
        messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
        ...
        
        return messageListener;
    }

看到setHandlerMethod(configureListenerAdapter(messageListener))这一行,这里创建并设置了一个HandlerAdapter,这个HandlerAdapter能够调用我们加了@RabbitListener注解的方法。

SimpleMessageListenerContainer接收消息的实现

SimpleRabbitListenerContainerFactory创建的MessageListenerContainer是SimpleMessageListenerContainer类,下面看它是怎么在启动后就能接收消息的。

上面讲过RabbitListenerEndpointRegistry类通过调用MessageListenerContainer的start()方法类启动这个MessageListenerContainer。

SimpleMessageListenerContainer类本身并没有实现start()方法,在它继承的抽象父类里面。进入AbstractMessageListenerContainer抽象类找到start()方法的实现


    @Override
    public void start() {
        if (isRunning()) {
            return;
        }
        if (!this.initialized) {
            synchronized (this.lifecycleMonitor) {
                if (!this.initialized) {
                    afterPropertiesSet();
                    this.initialized = true;
                }
            }
        }
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting Rabbit listener container.");
            }
            configureAdminIfNeeded();
            checkMismatchedQueues();
            doStart();
        }
        catch (Exception ex) {
            throw convertRabbitAccessException(ex);
        }
    }

真正的启动方法是doStart(),所以去SimpleMessageListenerContainer类中找这个类的doStart()实现:


   @Override
    protected void doStart() throws Exception {
        ...
        
            int newConsumers = initializeConsumers();
        
            ...
        
            for (BlockingQueueConsumer consumer : this.consumers) {
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                getTaskExecutor().execute(processor);
                if (getApplicationEventPublisher() != null) {
                    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
            }
        ... 
    }

这个方法很长,细节就不去深究了,这里注意两个方法,一个是initializeConsumers(),


    protected int initializeConsumers() {
        int count = 0;
        synchronized (this.consumersMonitor) {
            if (this.consumers == null) {
                this.cancellationLock.reset();
                this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
                for (int i = 0; i < this.concurrentConsumers; i++) {
                    BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                    this.consumers.add(consumer);
                    count++;
                }
            }
        }
        return count;
    }

这个方法创建了BlockingQueueConsumer,数量等于concurrentConsumers参数的配置。

另一个方法是getTaskExecutor().execute(processor),前面用BlockingQueueConsumer创建了AsyncMessageProcessingConsumer(实现了Runnable接口),这里获取到Executor来执行,每一个MessageListenerContainer都有各自的Executor。

在AsyncMessageProcessingConsumer类的run()方法里面可以找到下面这段代码:


    while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
        try {
            boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
            if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                if (receivedOk) {
                    if (isActive(this.consumer)) {
                        consecutiveIdles = 0;
                        if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                            considerAddingAConsumer();
                            consecutiveMessages = 0;
                        }
                    }
                }
                
                ...
            }
            
            ...
        }
        
        ...
    }

可以看到里面有个receiveAndExecute()方法,所以SimpleMessageListenerContainer接收消息的实现方案是:向Executor提交执行Runnable,Runnable中循环接收消息。

MessageListener调用@RabbitListener注解方法处理消息的实现

上面的receiveAndExecute()方法接收消息的同时也将其处理了,继续跟踪,直到进入下面这个方法:


    protected void executeListener(Channel channel, Message messageIn) throws Exception {

        ...
    
        invokeListener(channel, message);
    
        ...
    }

在这个方法里面可以看到invokeListener()方法,


    protected void invokeListener(Channel channel, Message message) throws Exception {
        this.proxy.invokeListener(channel, message);
    }

这里有个proxy,这个proxy是由下面这个方法创建的匿名类,

    protected void actualInvokeListener(Channel channel, Message message) throws Exception {
        Object listener = getMessageListener();
        if (listener instanceof ChannelAwareMessageListener) {
            doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
        }
        else if (listener instanceof MessageListener) {
            boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
            if (bindChannel) {
                RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
                resourceHolder.setSynchronizedWithTransaction(true);
                TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
                        resourceHolder);
            }
            try {
                doInvokeListener((MessageListener) listener, message);
            }
            finally {
                if (bindChannel) {
                    // unbind if we bound
                    TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
                }
            }
        }
        else if (listener != null) {
            throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
                    + listener);
        }
        else {
            throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
        }
    }

这个方法里可以看到doInvokeListener()方法,已经差不多接近我们的@RabbitListener注解的方法了,继续跟踪,


    protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
            throws Exception {
            
        ...
        
        try {
            listener.onMessage(message, channelToUse);
        }
        catch (Exception e) {
            throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
        }
        
        ...
    }

跟踪listener.onMessage()方法,直到进入MessagingMessageListenerAdapter类的onMessage()方法,


    @Override
    public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
        Message<?> message = toMessagingMessage(amqpMessage);
        if (logger.isDebugEnabled()) {
            logger.debug("Processing [" + message + "]");
        }
        try {
            Object result = invokeHandler(amqpMessage, channel, message);
            if (result != null) {
                handleResult(result, amqpMessage, channel, message);
            }
            else {
                logger.trace("No result object given - no result to handle");
            }
        }
        catch (ListenerExecutionFailedException e) {
            if (this.errorHandler != null) {
                try {
                    Object result = this.errorHandler.handleError(amqpMessage, message, e);
                    if (result != null) {
                        handleResult(result, amqpMessage, channel, message);
                    }
                    else {
                        logger.trace("Error handler returned no result");
                    }
                }
                catch (Exception ex) {
                    returnOrThrow(amqpMessage, channel, message, ex, ex);
                }
            }
            else {
                returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
            }
        }
    }

这里通过invokHandler()方法消费获取到的message,然后在catch里面处理异常,进入invokeHandler()方法:


private Object invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
            Message<?> message) {
        try {
            return this.handlerMethod.invoke(message, amqpMessage, channel);
        }
        catch (MessagingException ex) {
            throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
                    "be invoked with the incoming message", message.getPayload()), ex, amqpMessage);
        }
        catch (Exception ex) {
            throw new ListenerExecutionFailedException("Listener method '" +
                    this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex, amqpMessage);
        }
    }

在这里可以看到catch了所有的异常,也就是说只要是我们消费消息的方法里面抛出来的异常全都会被包装成ListenerExecutionFailedException,并且这个Exception里面把消息也放进去了。

这里的this.handlerMethod其实就是上面提到的HandlerAdapter,跟踪它的invoke()方法,看它是怎么调用我们@RabbitListener注解的方法的。

最后我们跟踪到InvocableHandlerMethod类的下面这个方法:


    @Nullable
    protected Object doInvoke(Object... args) throws Exception {
        ReflectionUtils.makeAccessible(getBridgedMethod());
        try {
            return getBridgedMethod().invoke(getBean(), args);
        }
        ...
    }

这里通过getBridgedMethod()方法拿到的就是@RabbitListener注解的方法了,这是在刚开始处理@RabbitListener注解时就已经保存下来的,然后就可以利用反射来调用这个方法,这样就完成了接收并处理消息的整个流程。

上一篇下一篇

猜你喜欢

热点阅读