SpringRabbitmq-MessageListenerCo

2018-09-02  本文已影响0人  lazyguy

MessageListenerContainer

定义2个方法:

void setupMessageListener(Object messageListener); 设置messageListener
MessageConverter getMessageConverter(); 得到MessageConverter(用于转换接收到的Message的)

AbstractMessageListenerContainer

    
    static final int DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL = 5000;
        
    public static final boolean DEFAULT_DEBATCHING_ENABLED = true;

    public static final int DEFAULT_PREFETCH_COUNT = 250;

    /**
     * The default recovery interval: 5000 ms = 5 seconds.
     */
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000;

    public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;

    private final ContainerDelegate delegate = this::actualInvokeListener;

    protected final Object consumersMonitor = new Object(); //NOSONAR

    private final Map<String, Object> consumerArgs = new HashMap<String, Object>();

    private ContainerDelegate proxy = this.delegate;

    private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;

    private ApplicationEventPublisher applicationEventPublisher;

    private PlatformTransactionManager transactionManager;

    private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();

    private String beanName;

    private Executor taskExecutor = new SimpleAsyncTaskExecutor();

    private boolean taskExecutorSet;

    private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);

    private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

    private RabbitAdmin rabbitAdmin;

    private boolean missingQueuesFatal = true;

    private boolean missingQueuesFatalSet;

    private boolean possibleAuthenticationFailureFatal = true;

    private boolean possibleAuthenticationFailureFatalSet;

    private boolean autoDeclare = true;

    private boolean mismatchedQueuesFatal = false;

    private long failedDeclarationRetryInterval = DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL;

    private boolean autoStartup = true;

    private int phase = Integer.MAX_VALUE;

    private volatile boolean active = false;

    private volatile boolean running = false;

    private final Object lifecycleMonitor = new Object();

    private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();

    private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();

    private MessageConverter messageConverter;
        是否暴露channel的listener给已经注册的ChannelAwareMessageListener?
    private boolean exposeListenerChannel = true;

    private volatile Object messageListener;

    private volatile AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;

    private volatile boolean deBatchingEnabled = DEFAULT_DEBATCHING_ENABLED;

    private volatile boolean initialized;

    private Collection<MessagePostProcessor> afterReceivePostProcessors;

    private volatile ApplicationContext applicationContext;

    private String listenerId;

    private Advice[] adviceChain = new Advice[0];

    private ConsumerTagStrategy consumerTagStrategy;

    private volatile boolean exclusive;

    private volatile boolean noLocal;

    private volatile boolean defaultRequeueRejected = true;

    private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;

    private long idleEventInterval;

    private volatile long lastReceive = System.currentTimeMillis();

    private boolean statefulRetryFatalWithNullMessageId = true;

    private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();

    private boolean alwaysRequeueWithTxManagerRollback;

    private String lookupKeyQualifier = "";

    private boolean forceCloseChannel = true

方法

checkMessageListener 检查messageListener的类型必须是MessageListener或ChannelAwareMessageListener
@Override
    public final void afterPropertiesSet() {
        super.afterPropertiesSet();父类检查ConnectionFactory存在
        Assert.state(
                this.exposeListenerChannel || !getAcknowledgeMode().isManual(),
                "You cannot acknowledge messages manually if the channel is not exposed to the listener "
                        + "(please check your configuration and set exposeListenerChannel=true or acknowledgeMode!=MANUAL)");
检查
        Assert.state(
                !(getAcknowledgeMode().isAutoAck() && isChannelTransacted()),
                "The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having a "
                        + "transactional channel. Either use a different AcknowledgeMode or make sure channelTransacted=false");
        validateConfiguration();
        initialize();
    }
    // -------------------------------------------------------------------------
    // Lifecycle methods for starting and stopping the container
    // -------------------------------------------------------------------------

    /**
     * Initialize this container.
     * <p>
     * Creates a Rabbit Connection and calls {@link #doInitialize()}.
     */
    public void initialize() {
        try {
            //获取锁并唤醒锁上等待的所有线程
            synchronized (this.lifecycleMonitor) {
                this.lifecycleMonitor.notifyAll();
            }
            //将delegate的和内含的adviceChain 生成代理
            initializeProxy(this.delegate);
            //?????????
            checkMissingQueuesFatalFromProperty();
            //??????
            checkPossibleAuthenticationFailureFatalFromProperty();
            //留给子类使用
            doInitialize();
            //??????
            if (!this.isExposeListenerChannel() && this.transactionManager != null) {
                logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
            }
            //默认生成一个SimpleAsyncTaskExecutor,并将标志位设置为true
            if (!this.taskExecutorSet && StringUtils.hasText(this.getBeanName())) {
                this.taskExecutor = new SimpleAsyncTaskExecutor(this.getBeanName() + "-");
                this.taskExecutorSet = true;
            }
            //默认事务标志位
            if (this.transactionManager != null) {
                if (!isChannelTransacted()) {
                    logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
                    setChannelTransacted(true);
                }

            }
        }
        catch (Exception ex) {
            throw convertRabbitAccessException(ex);
        }
    }
shutdown方法,设置标志位。doShutdown方法留给子类
    @Override
    public void start() {
      //根据条件判断是否调用afterPropertiesSet
        if (isRunning()) {
            return;
        }
        if (!this.initialized) {
            synchronized (this.lifecycleMonitor) {
                if (!this.initialized) {
                    afterPropertiesSet();
                    this.initialized = true;
                }
            }
        }
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting Rabbit listener container.");
            }
      //
            configureAdminIfNeeded();
            checkMismatchedQueues();
            doStart();
        }
        catch (Exception ex) {
            throw convertRabbitAccessException(ex);
        }
    }

SimpleMessageListenerContainer

    /**
     * Re-initializes this container's Rabbit message consumers, if not initialized already.           
     * Then submits each consumer  to this container's task executor. 再次初始化容器的        
     * message consumers。如果已经初始化了,提交每一个consumer到容器的task executor
     * @throws Exception Any Exception.
     */
    @Override
    protected void doStart() throws Exception {
              //第一步里面主要是将MessageListener希望监听的Queue和Container包含的Queue比较,如果container缺少了任一一个希望的QueueName,抛出异常。
        if (getMessageListener() instanceof ListenerContainerAware) {
            Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
            if (expectedQueueNames != null) {
                String[] queueNames = getQueueNames();
                Assert.state(expectedQueueNames.size() == queueNames.length,
                        "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                                + Arrays.asList(queueNames));
                boolean found = false;
                for (String queueName : queueNames) {
                    if (expectedQueueNames.contains(queueName)) {
                        found = true;
                    }
                    else {
                        found = false;
                        break;
                    }
                }
                Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                        + Arrays.asList(queueNames));
            }
        }
    //第二步,调用父类的super.doStart();
        super.doStart();
    //第三步,获取consumersMonitor锁,调用initializeConsumers初始化consumers(BlockingQueueConsumer)。
    //再根据consumer初始化AsyncMessageProcessingConsumer。
    //然后提交给Executor。然后遍历proccessor,调用processor.getStartupException。
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                throw new IllegalStateException("A stopped container should not have consumers");
            }
            int newConsumers = initializeConsumers();
            if (this.consumers == null) {
                logger.info("Consumers were initialized and then cleared " +
                        "(presumably the container was stopped concurrently)");
                return;
            }
            if (newConsumers <= 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Consumers are already running");
                }
                return;
            }
            Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
            for (BlockingQueueConsumer consumer : this.consumers) {
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                getTaskExecutor().execute(processor);
                if (getApplicationEventPublisher() != null) {
                    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
            }
            for (AsyncMessageProcessingConsumer processor : processors) {
                FatalListenerStartupException startupException = processor.getStartupException();
                if (startupException != null) {
                    throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                }
            }
        }
    }

BlockingQueueConsumer

    public BlockingQueueConsumer(ConnectionFactory connectionFactory,
            MessagePropertiesConverter messagePropertiesConverter,
            ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
            boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
            Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
        this.connectionFactory = connectionFactory;
        this.messagePropertiesConverter = messagePropertiesConverter;
        this.activeObjectCounter = activeObjectCounter;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = transactional;
        this.prefetchCount = prefetchCount;
        this.defaultRequeueRejected = defaultRequeueRejected;
        if (consumerArgs != null && consumerArgs.size() > 0) {
            this.consumerArgs.putAll(consumerArgs);
        }
        this.noLocal = noLocal;
        this.exclusive = exclusive;
        this.queues = Arrays.copyOf(queues, queues.length);
        this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
    }
    public void start() throws AmqpException {
        if (logger.isDebugEnabled()) {
            logger.debug("Starting consumer " + this);
        }

        this.thread = Thread.currentThread();

        try {
//1.先得到ResourceHolder,得到channel,如果channel的实质是AutorecoveringChannel,为其添加addRecoveryListener,既对象本身。
            this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
                    this.transactional);
            this.channel = this.resourceHolder.getChannel();
            addRecoveryListener();
        }
        catch (AmqpAuthenticationException e) {
            throw new FatalListenerStartupException("Authentication failure", e);
        }
  //2.?????
        this.consumer = new InternalConsumer(this.channel);
        this.deliveryTags.clear();
        this.activeObjectCounter.add(this);

        // mirrored queue might be being moved
        int passiveDeclareRetries = this.declarationRetries;
        this.declaring = true;
        do {
            if (cancelled()) {
                break;
            }
            try {
                attemptPassiveDeclarations();
                if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
                    logger.info("Queue declaration succeeded after retrying");
                }
                passiveDeclareRetries = 0;
            }
            catch (DeclarationException e) {
                if (passiveDeclareRetries > 0 && this.channel.isOpen()) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Queue declaration failed; retries left=" + (passiveDeclareRetries), e);
                        try {
                            Thread.sleep(this.failedDeclarationRetryInterval);
                        }
                        catch (InterruptedException e1) {
                            this.declaring = false;
                            Thread.currentThread().interrupt();
                            this.activeObjectCounter.release(this);
                            throw RabbitExceptionTranslator.convertRabbitAccessException(e1);
                        }
                    }
                }
                else if (e.getFailedQueues().size() < this.queues.length) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Not all queues are available; only listening on those that are - configured: "
                                + Arrays.asList(this.queues) + "; not available: " + e.getFailedQueues());
                    }
                    this.missingQueues.addAll(e.getFailedQueues());
                    this.lastRetryDeclaration = System.currentTimeMillis();
                }
                else {
                    this.declaring = false;
                    this.activeObjectCounter.release(this);
                    throw new QueuesNotAvailableException("Cannot prepare queue for listener. "
                            + "Either the queue doesn't exist or the broker will not allow us to use it.", e);
                }
            }
        }
        while (passiveDeclareRetries-- > 0 && !cancelled());
        this.declaring = false;

        if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
            // Set basicQos before calling basicConsume (otherwise if we are not acking the broker
            // will send blocks of 100 messages)
            try {
                this.channel.basicQos(this.prefetchCount);
            }
            catch (IOException e) {
                this.activeObjectCounter.release(this);
                throw new AmqpIOException(e);
            }
        }


        try {
            if (!cancelled()) {
                for (String queueName : this.queues) {
                    if (!this.missingQueues.contains(queueName)) {
                        consumeFromQueue(queueName);
                    }
                }
            }
        }
        catch (IOException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
    }
上一篇下一篇

猜你喜欢

热点阅读