RabbitMQ连接池——CachingConnectionFa
因为上一个项目中使用了RabbitMQ,但是当时没有考虑过性能的问题,今天觉得好像不对劲,大量的重复建立连接,造成了很大的性能浪费,于是我就找呀找,发现Spring提供了一种RabbitMQ连接池,所以今天我们来看一下它是如何设计的。
AMQP(version~0-9-1)
什么是AMQP?
-
首先它是一种高级消息队列协议,是使用 TCP 提供可靠投递的应用层协议。与HTTP不同的是,它定义了Server端的服务和行为(也就是AMQP模型。今天这里暂时不说),同时AMQP也是长连接的(此处不对比1.1)。简单的来理解(当然AMQP协议包含了很多东西),AMQP是一种RPC的传输机制,定义了很多消息类和方法,这些类和方法组成了AMQP命令,就像你所看到的RPC代码一样,也就是说我们可以通过调用这些方法来完成我们的任务。我给大家放一张图:
image.png - Connection、Channel
大家可以看到上面的Class,他们是做什么用的呢?我们简单来说一下:- Connection
是一个socket连接,它封装了socket协议相关部分逻辑。你可以认为一个Connection就是一个Tcp连接。 - Channel
有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接(通常每个thread创建单独的channel进行通讯)。
下面给大家一段详细的资料,大家可以理解一下他们的不同。
- Connection
A Connection represents a real TCP connection to the message broker, where as a Channel
is a virtual connection (AMPQ connection) inside it. This way you can use as many (virtual)
connections as you want inside your application without overloading the broker with TCP
connections.You can use one Channel for everything. However, if you have multiple threads,
it's suggested to use a different Channel for each thread.There is no direct relation between Channel and Queue. A Channel is used to send AMQP commands to the broker. This can be the creation of a queue or similar, but these concepts are not tied together.Consumerruns in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue, the broker uses round-robin to distribute the messages between them equally. It is also possible to attach the same Consumer to multiple Queues. You can understand Consumers as callbacks. These are called everytime a message arrives on a Queue the Consumer is bound to. For the case of the Java Client, each Consumers has a methodhandleDelivery(...), which represents the callback method. What you typically do is, subclassDefaultConsumer and override handleDelivery(...). Note: If you attach the same Consumer instance to multiple queues, this method will be called by different threads. So take care of synchronization if necessary.
- AMQP建立连接
大致了解一下之后,今天的重点,如何建立连接?
Connection连接过程大致可以认为是三个同步的RPC调用,Connection.Start ---> Connection.StartOk ---> Connection.Open。 也就是说我们每次建立一个Connection的消耗是很大的,所以有没有一种方式来解决这个问题?
我想大家可能都想到了,那就是建立连接池。 -
连接池
但是在这里有一个问题,我是应该缓存Channel还是Connection?这里给大家看两种方案:
image.png- 方案一:只缓存Connection
- 方案二:Connection和Channel都缓存
这两种方案留给大家去思考。
本来打算手写一个连接池,但是发现Spring提供了现成的,不用想,肯定比我写得好,
- CachingConnectionFactory
CachingConnectionFactory为我们提供了两种缓存的模式:- CHANNEL模式:这也是CachingConnectionFactory的默认模式,在这种模式下,所有的createConnection()方法实际上返回的都是同一个Connection,同样的Connection.close()方法是没用的,因为就一个,默认情况下,Connection中只缓存了一个Channel,在并发量不大的时候这种模式是完全够用的,当并发量较高的时候,我们可以setChannelCacheSize()来增加Connection中缓存的Channel的数量。
- CONNECTION模式:在CONNECTION模式下,每一次调用createConnection()方法都会新建一个或者从缓存中获取,根据你设置的ConnectionCacheSize的大小,当小于的时候会采用新建的策略,当大于等于的时候会采用从缓存中获取的策略,与CHANNEL模式不同的是,CONNECTION模式对Connection和Channel都进行了缓存,最新版本的client中已经将Channel的缓存数量从1增加到了25,但是在并发量不是特别大的情况下,作用并不是特别明显。
使用CachingConnectionFactory需要注意的一点是:所有你获取的Channel对象必须要显式的关闭,所以finally中一定不要忘记释放资源,如果忘记释放,则可能造成连接池中没有资源可用。
好了,我们来看一下创建Connection源码的实现:
- createConnection()
synchronized (this.connectionMonitor) {
// CHANNEL模式下,这里的connection是ChannelCachingConnectionProxy 代理对象
//这样做的目的是为Channel提供临时的存储空间(也就是缓存Channel),以便其他客户端调用
if (this.cacheMode == CacheMode.CHANNEL) {
//确保Connection对象不为null,target是真实的连接
if (this.connection.target == null) {
//第一次调用 createConnection 方法时 connection.target 值为 null,因此会调用 createBareConnection 方法创建出 SimpleConnection 赋值给 connection.target
//SimpleConnection 中delegate属性是真正的RabbitMQ 连接(AMQConnection)
this.connection.target = super.createBareConnection();
// invoke the listener *after* this.connection is assigned
if (!this.checkoutPermits.containsKey(this.connection)) {
// Map<Connection, Semaphore> checkoutPermits 中存放了信道的许可数量,也就是默认的25,通过信号量来同步资源
this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
}
this.connection.closeNotified.set(false);
//向所有 ConnectionListener 发布 onCreate 事件
getConnectionListener().onCreate(this.connection);
}
return this.connection;
}
else if (this.cacheMode == CacheMode.CONNECTION) {
//直接从缓存中获取
return connectionFromCache();
}
}
创建Channel的源码实现:
- createChannel()
Semaphore permits = null;
//大于0的情况下才会通过 Semaphore 限制当前连接下可用的信道数量
if (this.channelCheckoutTimeout > 0) {
//获取许可
permits = obtainPermits(connection);
}
//获取当前Connection的Channel代理集合
LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
ChannelProxy channel = null;
if (connection.isOpen()) {
//这里主要是从缓存中获取,在同步块中,先判断 channelList 是否为空,若不为空,则返回队列头部缓存的 ChannelProxy(要从队列中移除)。
//如果没有可用的缓存信道,则通过 getCachedChannelProxy 方法创建新的 ChannelProxy。
channel = findOpenChannel(channelList, channel);
if (channel != null) {
if (logger.isTraceEnabled()) {
logger.trace("Found cached Rabbit Channel: " + channel.toString());
}
}
}
if (channel == null) {
try {
//创建新Channel 的过程
channel = getCachedChannelProxy(connection, channelList, transactional);
}
catch (RuntimeException e) {
if (permits != null) {
permits.release();
if (logger.isDebugEnabled()) {
logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
+ permits.availablePermits());
}
}
throw e;
}
}
return channel;
}
private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connection,
LinkedList<ChannelProxy> channelList, boolean transactional) { //NOSONAR LinkedList for addLast()
//通过Connection中delegate创建Channel对象
Channel targetChannel = createBareChannel(connection, transactional);
if (logger.isDebugEnabled()) {
logger.debug("Creating cached Rabbit Channel from " + targetChannel);
}
//向所有 ChannelListener 发布 onCreate 事件
getChannelListener().onCreate(targetChannel, transactional);
Class<?>[] interfaces;
//通过 Proxy.newProxyInstance创建一个实现了ChannelProxy接口的动态代理对象。
//所有对该实例的方法调用都会转交给CachedChannelInvocationHandler 的 invoke 方法处理
if (this.publisherConfirms || this.publisherReturns) {
interfaces = new Class<?>[] { ChannelProxy.class, PublisherCallbackChannel.class };
}
else {
interfaces = new Class<?>[] { ChannelProxy.class };
}
return (ChannelProxy) Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(),
interfaces, new CachedChannelInvocationHandler(connection, targetChannel, channelList,
transactional));
}