RabbitMQrabbitMQ

RabbitMQ连接池——CachingConnectionFa

2018-12-16  本文已影响0人  黄金矿工00七

因为上一个项目中使用了RabbitMQ,但是当时没有考虑过性能的问题,今天觉得好像不对劲,大量的重复建立连接,造成了很大的性能浪费,于是我就找呀找,发现Spring提供了一种RabbitMQ连接池,所以今天我们来看一下它是如何设计的。

AMQP(version~0-9-1)

什么是AMQP?

A Connection represents a real TCP connection to the message broker, where as a Channel
is a virtual connection (AMPQ connection) inside it. This way you can use as many (virtual)
connections as you want inside your application without overloading the broker with TCP
connections.You can use one Channel for everything. However, if you have multiple threads,
it's suggested to use a different Channel for each thread.There is no direct relation between Channel and Queue. A Channel is used to send AMQP commands to the broker. This can be the creation of a queue or similar, but these concepts are not tied together.Consumerruns in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue, the broker uses round-robin to distribute the messages between them equally. It is also possible to attach the same Consumer to multiple Queues. You can understand Consumers as callbacks. These are called everytime a message arrives on a Queue the Consumer is bound to. For the case of the Java Client, each Consumers has a methodhandleDelivery(...), which represents the callback method. What you typically do is, subclassDefaultConsumer and override handleDelivery(...). Note: If you attach the same Consumer instance to multiple queues, this method will be called by different threads. So take care of synchronization if necessary.


    synchronized (this.connectionMonitor) {
// CHANNEL模式下,这里的connection是ChannelCachingConnectionProxy 代理对象
//这样做的目的是为Channel提供临时的存储空间(也就是缓存Channel),以便其他客户端调用 
            if (this.cacheMode == CacheMode.CHANNEL) {
//确保Connection对象不为null,target是真实的连接
                if (this.connection.target == null) {
//第一次调用 createConnection 方法时 connection.target 值为 null,因此会调用 createBareConnection 方法创建出 SimpleConnection 赋值给 connection.target
//SimpleConnection 中delegate属性是真正的RabbitMQ 连接(AMQConnection)
                    this.connection.target = super.createBareConnection();
                    // invoke the listener *after* this.connection is assigned
                    if (!this.checkoutPermits.containsKey(this.connection)) {
// Map<Connection, Semaphore> checkoutPermits 中存放了信道的许可数量,也就是默认的25,通过信号量来同步资源
                        this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
                    }
                    this.connection.closeNotified.set(false);
//向所有 ConnectionListener 发布 onCreate 事件
                    getConnectionListener().onCreate(this.connection);
                }
                return this.connection;
            }
            else if (this.cacheMode == CacheMode.CONNECTION) {
//直接从缓存中获取
                return connectionFromCache();
            }
        }

创建Channel的源码实现:

        Semaphore permits = null;
//大于0的情况下才会通过 Semaphore 限制当前连接下可用的信道数量
        if (this.channelCheckoutTimeout > 0) {
//获取许可
            permits = obtainPermits(connection);
        }
//获取当前Connection的Channel代理集合
        LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
        ChannelProxy channel = null;
        if (connection.isOpen()) {
//这里主要是从缓存中获取,在同步块中,先判断 channelList 是否为空,若不为空,则返回队列头部缓存的 ChannelProxy(要从队列中移除)。
//如果没有可用的缓存信道,则通过 getCachedChannelProxy 方法创建新的 ChannelProxy。
            channel = findOpenChannel(channelList, channel);
            if (channel != null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Found cached Rabbit Channel: " + channel.toString());
                }
            }
        }
        if (channel == null) {
            try {
//创建新Channel 的过程
                channel = getCachedChannelProxy(connection, channelList, transactional);
            }
            catch (RuntimeException e) {
                if (permits != null) {
                    permits.release();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
                                + permits.availablePermits());
                    }
                }
                throw e;
            }
        }
        return channel;
    }
    private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connection,
            LinkedList<ChannelProxy> channelList, boolean transactional) { //NOSONAR LinkedList for addLast()
//通过Connection中delegate创建Channel对象
        Channel targetChannel = createBareChannel(connection, transactional);
        if (logger.isDebugEnabled()) {
            logger.debug("Creating cached Rabbit Channel from " + targetChannel);
        }
//向所有 ChannelListener 发布 onCreate 事件
        getChannelListener().onCreate(targetChannel, transactional);
        Class<?>[] interfaces;
//通过 Proxy.newProxyInstance创建一个实现了ChannelProxy接口的动态代理对象。
//所有对该实例的方法调用都会转交给CachedChannelInvocationHandler 的 invoke 方法处理
        if (this.publisherConfirms || this.publisherReturns) {
            interfaces = new Class<?>[] { ChannelProxy.class, PublisherCallbackChannel.class };
        }
        else {
            interfaces = new Class<?>[] { ChannelProxy.class };
        }
        return (ChannelProxy) Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(),
                interfaces, new CachedChannelInvocationHandler(connection, targetChannel, channelList,
                        transactional));
    }
上一篇下一篇

猜你喜欢

热点阅读