[druid 源码解析] 5 归还连接
2021-11-11 本文已影响0人
AndyWei123
我们在创建链接的时候会发现,返回给 Mybatis
的并不是一个简单的 connection
而是一个 DruidPooledConnection
这里是一个我们需要注意点的点,会到正题,我们拿到一个链接,假如使用完成后我们最重要就是 close 掉这个链接,那么 Mybatis
也是这样做的。我们可以看一下 SqlSessionInterceptor
这个类的 Invoke
方法,所有 Dao 都是通过这个拦截器生成代理对象。
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
// release the connection to avoid a deadlock if the translator is no loaded. See issue #22
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator
.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
我们看 finally
方法最终调用了 closeSqlSession
方法,我们根据最后调用链路,最终跟踪到了DruidPooledConnection
的 recycle 方法,调用栈如下:
我们先看一下他的 recycle 方法。
public void recycle() throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
dataSource.recycle(this);
}
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}
我们看一下主要的逻辑,就是调用 DruiddataSource
的 recycle
方法, 兜兜转转终于回到了我们的 DruiddataSource
。
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
if (holder == null) {
LOG.warn("connectionHolder is null");
return;
}
// 假如当前线程和获取链接不是同一个线程,这里会 warn 报错, 这里会涉及到事务相关的东西,我们后面详解。
if (logDifferentThread //
&& (!isAsyncCloseConnectionEnable()) //
&& pooledConnection.ownerThread != Thread.currentThread()//
) {
LOG.warn("get/close not same thread");
}
final Connection physicalConnection = holder.conn;
// 从 activeConnections 中移除当前 connection
if (pooledConnection.traceEnable) {
Object oldInfo = null;
activeConnectionLock.lock();
try {
if (pooledConnection.traceEnable) {
oldInfo = activeConnections.remove(pooledConnection);
pooledConnection.traceEnable = false;
}
} finally {
activeConnectionLock.unlock();
}
if (oldInfo == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
}
}
}
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// check need to rollback?
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
// 假如不是同个线程,需要开启锁后才能rest,rest 主要是做一些关于重置 holder setting 的事情。
// reset holder, restore default settings, clear warnings
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
holder.reset();
}
if (holder.discard) {
return;
}
// j检查是否大于最大连接数,是的话清除
if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
discardConnection(holder);
return;
}
// 检查线程是否关闭
if (physicalConnection.isClosed()) {
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
// 检查线程是否存活
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);
destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
if (holder.initSchema != null) {
holder.conn.setSchema(holder.initSchema);
holder.initSchema = null;
}
if (!enable) {
discardConnection(holder);
return;
}
boolean result;
final long currentTimeMillis = System.currentTimeMillis();
// 检查是否超过醉倒时长
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
discardConnection(holder);
return;
}
}
// 获取锁,最重要的逻辑开始了
lock.lock();
try {
// 计时器操作
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
// 将连接放回 connections
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
holder.clearStatementCache();
if (!holder.discard) {
discardConnection(holder);
holder.discard = true;
}
LOG.error("recyle error", e);
recycleErrorCountUpdater.incrementAndGet(this);
}
}
这里的逻辑比较简单,主要如下:
- 检查当前线程和获取链接的线程是否同一个,这里可能涉及到并发问题和事务问题,我们后续进行讲解。
- 件当前线程从活跃线程组
activeConnections
中移除,主要是方便后面的丢弃或者回收的工作。 - 然后检查是否需要进行回滚,不需要继续往下走。
-
reset
当前connection
的holder
的相关配置。 - 接下来是对各项信息进行检查,主要是看连接是否还可以重用。
- 锁住然后进行真正的回收工作,这里回收交给了
putLast
方法。
最后我们来看一下putLast
方法。
boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive || e.discard || this.closed) {
return false;
}
e.lastActiveTimeMillis = lastActiveTimeMillis;
connections[poolingCount] = e;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = lastActiveTimeMillis;
}
notEmpty.signal();
notEmptySignalCount++;
return true;
}
这里主要是将连接放回到 connections
空闲连接数组的最后面,这里我们还记得,获取链从头开始取,然后释放就放回到最后一位。最后发送信号,通知等待线程获取连接。