Mybatis数据库连接池解析
一、为什么要学习DB的连接池?
最近线上,在QPS突增突降的时候总会报异常,看日志是因为同时创建连接数过多导致,但是用的是公司自己的orm框架。并没有对内开源所以并不清楚是什么问题,但是这种问题开源框架会不会有呢?
通常我们常用的DB连接池主要有:mybatis原生、C3p0、dbcp以及阿里爸爸的druid。这几种都是比较专业的连接池,对比了一下,Mybatis原生连接池实现的最为简单,所以从github上clone下来,简要的看了一下。
Mybatis的代码目录很整洁,看上去 也很清晰。
Paste_Image.png我们也可以很容易的直奔主题,直接看看datasource的实现。下面就是全部的代码:
Paste_Image.png反正我第一眼看是懵逼的,竟然这么点代码就搞定了....
反观数据库连接池,主要有两种Pooled和Unpooled。字面上也能理解,一种是用数据库连接池的一种是不用连接池的。先看看Unpooled的:
public class UnpooledDataSource implements DataSource { private ClassLoader driverClassLoader; private Properties driverProperties; private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>(); private String driver; private String url; private String username; private String password; private Boolean autoCommit; private Integer defaultTransactionIsolationLevel; static { Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); registeredDrivers.put(driver.getClass().getName(), driver); } } public UnpooledDataSource() { }public class UnpooledDataSource implements DataSource {
private ClassLoader driverClassLoader;
private Properties driverProperties;
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();
private String driver;
private String url;
private String username;
private String password;
private Boolean autoCommit;
private Integer defaultTransactionIsolationLevel;
static {
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
public UnpooledDataSource() {
}
UnpooledDataSource 继承自DataSource ,同时里面定义了一些我们常用的链接数据库的配置。
主要看getConnection()
private Connection doGetConnection(String username, String password) throws SQLException {
Properties props = new Properties();
if (driverProperties != null) {
props.putAll(driverProperties);
}
if (username != null) {
props.setProperty("user", username);
}
if (password != null) {
props.setProperty("password", password);
}
return doGetConnection(props);
}
private Connection doGetConnection(Properties properties) throws SQLException {
initializeDriver();
Connection connection = DriverManager.getConnection(url, properties);
configureConnection(connection);
return connection;
}
很明显,每调用一次都会创建一个连接,当并发量上来的时候那么数据库一定会来一个大爆炸,哈哈哈。
所以在使用mybatis一定要使用连接池,要不然你的数据库可就惨了。
再看看pooledDataSource:
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) {//用Staye做同步,空闲的链接集合和活跃的链接集合都在state中
if (!state.idleConnections.isEmpty()) {//先查看当前的空闲连接池是否为空
// Pool has available connection
conn = state.idleConnections.remove(0);//不为空我就从空闲的连接池取咯
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
// Pool does not have available connection
if (state.activeConnections.size() < poolMaximumActiveConnections) {//现在没有空闲链接了,那看一下活跃队列,如果当前的链接结合还没满。
// Can create new connection
conn = new PooledConnection(dataSource.getConnection(), this);//创建链接!
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {//如果活跃连接满了,链接都很活跃,那证明并发很高了
// Cannot create new connection
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {//既然现在满了,那就检查一下看看能不能有可以抢占的链接,先看一下这个连接被用了多久了,有米有超过我的限制。
// Can claim overdue connection //如果超限了,那么我此时应该把它抢占
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);//移除这个连接
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
log.debug("Bad connection. Could not roll back");
}
}
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);//用原来老的Connection创建一个新的PooledConnection
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else { //如果此时我没有可以抢占的连接池,那我怎么办?显然不能反回,那我还是在这等一会儿吧。wait一会儿
// Must wait
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait); //wait唤醒之后继续while 重复上述步骤。
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) { //如果我现在拿到了链接,不管是从哪拿的吧
if (conn.isValid()) {//先判断一下我的链接是否还有效,有效的话我就初始化一下。那么此时默认下一次循环会退出while
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
上面的代码虽然是一大坨,但是条理十分清楚(具体步骤可以看代码注释)。
这里我们能看到的比较醒目的一个synchronized (state) ,这就表明这个方法是同步方法,但是这样也就降低了popConnection的性能,因为多线程需要竞争state的锁。
对于弹出的连接,mybatis需要检查连接的有效性,对于有效性的检测,Mybatis做的是十分保守的,具体的实现在isValid()方法中:
/*
* Method to see if the connection is usable
*
* @return True if the connection is usable
*/
public boolean isValid() {
return valid && realConnection != null && dataSource.pingConnection(this);
}
/*
* Method to check to see if a connection is still usable
*
* @param conn - the connection to check
* @return True if the connection is still usable
*/
protected boolean pingConnection(PooledConnection conn) {
boolean result = true;
try {
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
result = false;
}
if (result) {
if (poolPingEnabled) {
if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
if (log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
Connection realConn = conn.getRealConnection();
Statement statement = realConn.createStatement();
ResultSet rs = statement.executeQuery(poolPingQuery);
rs.close();
statement.close();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception e) {
log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
conn.getRealConnection().close();
} catch (Exception e2) {
//ignore
}
result = false;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
}
}
return result;
}
上面的实现很简单,其实就是在获取配置中的poolPinQuery语句。一般我们用Select now | | select 1。
但是这种策略会严重降低sql的查询效率,如果开启此功能,每一次Query相当于两次Query。
而其他的数据库连接池,则是有单独的线程去check,像C3P0。所以在线程池这里,Mybatis原生做的并不是很好。
连接取出来之后,在Query之后需要将连接放回连接池中,那么这又是怎么进行的呢?
其实仅仅是调用了Connection的Close,但是这个Connection不是jdbc的Connection,而是PooledConnection。它包装了realConnection(Db的Connection)和一个proxyConnection(Connection的代理)。
lass PooledConnection implements InvocationHandler {
private static final String CLOSE = "close";
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
private int hashCode = 0;
private PooledDataSource dataSource;
private Connection realConnection;
private Connection proxyConnection;
private long checkoutTimestamp;
private long createdTimestamp;
private long lastUsedTimestamp;
private int connectionTypeCode;
private boolean valid;
/*
* Required for InvocationHandler implementation.
*
* @param proxy - not used
* @param method - the method to be executed
* @param args - the parameters to be passed to the method
* @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
dataSource.pushConnection(this);
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}
首先,它实现了InvocationHandler ,那么我们基本上知道了它是一个JDK原生代理的实现。接下来的invoke方法里面,对Close方法做了处理。处理的方式是将Connection push到连接池中。
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) { //先同步
state.activeConnections.remove(conn);//链接从活跃连接池移除
if (conn.isValid()) {//再次判断链接的有效性
//如果空闲连接池不满,那么将链接返回连接池,如果满了,那么Close掉链接
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
state.notifyAll();//之前等待获取连接的pop方法就被唤醒了,重新去争抢线程
} else {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
conn.invalidate();
}
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++;
}
}
}
Push的逻辑也比较简单。连接在取出,使用,最后到返回连接池。Mybatis做的都十分的简单清晰,但是性能确实可能受到影响,猜测可能开源的优秀连接池太多Mybatis不想重新造轮子了吧。